http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java deleted file mode 100644 index 957a6d9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.ksm; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs; -import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo; -import org.apache.hadoop.ozone.ksm.exceptions.KSMException; -import org.apache.hadoop.ozone.protocol.proto - .KeySpaceManagerProtocolProtos.BucketInfo; -import org.apache.hadoop.ozone.OzoneAcl; -import org.apache.hadoop.util.Time; -import org.iq80.leveldb.DBException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -/** - * KSM bucket manager. - */ -public class BucketManagerImpl implements BucketManager { - private static final Logger LOG = - LoggerFactory.getLogger(BucketManagerImpl.class); - - /** - * KSMMetadataManager is used for accessing KSM MetadataDB and ReadWriteLock. - */ - private final KSMMetadataManager metadataManager; - - /** - * Constructs BucketManager. - * @param metadataManager - */ - public BucketManagerImpl(KSMMetadataManager metadataManager){ - this.metadataManager = metadataManager; - } - - /** - * MetadataDB is maintained in MetadataManager and shared between - * BucketManager and VolumeManager. (and also by KeyManager) - * - * BucketManager uses MetadataDB to store bucket level information. - * - * Keys used in BucketManager for storing data into MetadataDB - * for BucketInfo: - * {volume/bucket} -> bucketInfo - * - * Work flow of create bucket: - * - * -> Check if the Volume exists in metadataDB, if not throw - * VolumeNotFoundException. - * -> Else check if the Bucket exists in metadataDB, if so throw - * BucketExistException - * -> Else update MetadataDB with VolumeInfo. - */ - - /** - * Creates a bucket. - * @param bucketInfo - KsmBucketInfo. - */ - @Override - public void createBucket(KsmBucketInfo bucketInfo) throws IOException { - Preconditions.checkNotNull(bucketInfo); - metadataManager.writeLock().lock(); - String volumeName = bucketInfo.getVolumeName(); - String bucketName = bucketInfo.getBucketName(); - try { - byte[] volumeKey = metadataManager.getVolumeKey(volumeName); - byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); - - //Check if the volume exists - if (metadataManager.get(volumeKey) == null) { - LOG.debug("volume: {} not found ", volumeName); - throw new KSMException("Volume doesn't exist", - KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - //Check if bucket already exists - if (metadataManager.get(bucketKey) != null) { - LOG.debug("bucket: {} already exists ", bucketName); - throw new KSMException("Bucket already exist", - KSMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS); - } - - KsmBucketInfo ksmBucketInfo = KsmBucketInfo.newBuilder() - .setVolumeName(bucketInfo.getVolumeName()) - .setBucketName(bucketInfo.getBucketName()) - .setAcls(bucketInfo.getAcls()) - .setStorageType(bucketInfo.getStorageType()) - .setIsVersionEnabled(bucketInfo.getIsVersionEnabled()) - .setCreationTime(Time.now()) - .build(); - metadataManager.put(bucketKey, ksmBucketInfo.getProtobuf().toByteArray()); - - LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName); - } catch (IOException | DBException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Bucket creation failed for bucket:{} in volume:{}", - bucketName, volumeName, ex); - } - throw ex; - } finally { - metadataManager.writeLock().unlock(); - } - } - - /** - * Returns Bucket Information. - * - * @param volumeName - Name of the Volume. - * @param bucketName - Name of the Bucket. - */ - @Override - public KsmBucketInfo getBucketInfo(String volumeName, String bucketName) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - metadataManager.readLock().lock(); - try { - byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); - byte[] value = metadataManager.get(bucketKey); - if (value == null) { - LOG.debug("bucket: {} not found in volume: {}.", bucketName, - volumeName); - throw new KSMException("Bucket not found", - KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); - } - return KsmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(value)); - } catch (IOException | DBException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Exception while getting bucket info for bucket: {}", - bucketName, ex); - } - throw ex; - } finally { - metadataManager.readLock().unlock(); - } - } - - /** - * Sets bucket property from args. - * @param args - BucketArgs. - * @throws IOException - */ - @Override - public void setBucketProperty(KsmBucketArgs args) throws IOException { - Preconditions.checkNotNull(args); - metadataManager.writeLock().lock(); - String volumeName = args.getVolumeName(); - String bucketName = args.getBucketName(); - try { - byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); - //Check if volume exists - if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) == - null) { - LOG.debug("volume: {} not found ", volumeName); - throw new KSMException("Volume doesn't exist", - KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - byte[] value = metadataManager.get(bucketKey); - //Check if bucket exist - if(value == null) { - LOG.debug("bucket: {} not found ", bucketName); - throw new KSMException("Bucket doesn't exist", - KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); - } - KsmBucketInfo oldBucketInfo = KsmBucketInfo.getFromProtobuf( - BucketInfo.parseFrom(value)); - KsmBucketInfo.Builder bucketInfoBuilder = KsmBucketInfo.newBuilder(); - bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName()) - .setBucketName(oldBucketInfo.getBucketName()); - - //Check ACLs to update - if(args.getAddAcls() != null || args.getRemoveAcls() != null) { - bucketInfoBuilder.setAcls(getUpdatedAclList(oldBucketInfo.getAcls(), - args.getRemoveAcls(), args.getAddAcls())); - LOG.debug("Updating ACLs for bucket: {} in volume: {}", - bucketName, volumeName); - } else { - bucketInfoBuilder.setAcls(oldBucketInfo.getAcls()); - } - - //Check StorageType to update - StorageType storageType = args.getStorageType(); - if (storageType != null) { - bucketInfoBuilder.setStorageType(storageType); - LOG.debug("Updating bucket storage type for bucket: {} in volume: {}", - bucketName, volumeName); - } else { - bucketInfoBuilder.setStorageType(oldBucketInfo.getStorageType()); - } - - //Check Versioning to update - Boolean versioning = args.getIsVersionEnabled(); - if (versioning != null) { - bucketInfoBuilder.setIsVersionEnabled(versioning); - LOG.debug("Updating bucket versioning for bucket: {} in volume: {}", - bucketName, volumeName); - } else { - bucketInfoBuilder - .setIsVersionEnabled(oldBucketInfo.getIsVersionEnabled()); - } - bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime()); - - metadataManager.put(bucketKey, - bucketInfoBuilder.build().getProtobuf().toByteArray()); - } catch (IOException | DBException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Setting bucket property failed for bucket:{} in volume:{}", - bucketName, volumeName, ex); - } - throw ex; - } finally { - metadataManager.writeLock().unlock(); - } - } - - /** - * Updates the existing ACL list with remove and add ACLs that are passed. - * Remove is done before Add. - * - * @param existingAcls - old ACL list. - * @param removeAcls - ACLs to be removed. - * @param addAcls - ACLs to be added. - * @return updated ACL list. - */ - private List<OzoneAcl> getUpdatedAclList(List<OzoneAcl> existingAcls, - List<OzoneAcl> removeAcls, List<OzoneAcl> addAcls) { - if(removeAcls != null && !removeAcls.isEmpty()) { - existingAcls.removeAll(removeAcls); - } - if(addAcls != null && !addAcls.isEmpty()) { - addAcls.stream().filter(acl -> !existingAcls.contains(acl)).forEach( - existingAcls::add); - } - return existingAcls; - } - - /** - * Deletes an existing empty bucket from volume. - * @param volumeName - Name of the volume. - * @param bucketName - Name of the bucket. - * @throws IOException - */ - public void deleteBucket(String volumeName, String bucketName) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - metadataManager.writeLock().lock(); - try { - byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); - //Check if volume exists - if (metadataManager.get(metadataManager.getVolumeKey(volumeName)) - == null) { - LOG.debug("volume: {} not found ", volumeName); - throw new KSMException("Volume doesn't exist", - KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - //Check if bucket exist - if (metadataManager.get(bucketKey) == null) { - LOG.debug("bucket: {} not found ", bucketName); - throw new KSMException("Bucket doesn't exist", - KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); - } - //Check if bucket is empty - if (!metadataManager.isBucketEmpty(volumeName, bucketName)) { - LOG.debug("bucket: {} is not empty ", bucketName); - throw new KSMException("Bucket is not empty", - KSMException.ResultCodes.FAILED_BUCKET_NOT_EMPTY); - } - metadataManager.delete(bucketKey); - } catch (IOException ex) { - if (!(ex instanceof KSMException)) { - LOG.error("Delete bucket failed for bucket:{} in volume:{}", bucketName, - volumeName, ex); - } - throw ex; - } finally { - metadataManager.writeLock().unlock(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List<KsmBucketInfo> listBuckets(String volumeName, - String startBucket, String bucketPrefix, int maxNumOfBuckets) - throws IOException { - Preconditions.checkNotNull(volumeName); - metadataManager.readLock().lock(); - try { - return metadataManager.listBuckets( - volumeName, startBucket, bucketPrefix, maxNumOfBuckets); - } finally { - metadataManager.readLock().unlock(); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java deleted file mode 100644 index 42331f6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMXBean.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.ksm; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.jmx.ServiceRuntimeInfo; - -/** - * This is the JMX management interface for ksm information. - */ -@InterfaceAudience.Private -public interface KSMMXBean extends ServiceRuntimeInfo { - - String getRpcPort(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java deleted file mode 100644 index f5a2d5b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.ksm; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; -import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.locks.Lock; - -/** - * KSM metadata manager interface. - */ -public interface KSMMetadataManager { - /** - * Start metadata manager. - */ - void start(); - - /** - * Stop metadata manager. - */ - void stop() throws IOException; - - /** - * Get metadata store. - * @return metadata store. - */ - @VisibleForTesting - MetadataStore getStore(); - - /** - * Returns the read lock used on Metadata DB. - * @return readLock - */ - Lock readLock(); - - /** - * Returns the write lock used on Metadata DB. - * @return writeLock - */ - Lock writeLock(); - - /** - * Returns the value associated with this key. - * @param key - key - * @return value - */ - byte[] get(byte[] key) throws IOException; - - /** - * Puts a Key into Metadata DB. - * @param key - key - * @param value - value - */ - void put(byte[] key, byte[] value) throws IOException; - - /** - * Deletes a Key from Metadata DB. - * @param key - key - */ - void delete(byte[] key) throws IOException; - - /** - * Atomic write a batch of operations. - * @param batch - * @throws IOException - */ - void writeBatch(BatchOperation batch) throws IOException; - - /** - * Given a volume return the corresponding DB key. - * @param volume - Volume name - */ - byte[] getVolumeKey(String volume); - - /** - * Given a user return the corresponding DB key. - * @param user - User name - */ - byte[] getUserKey(String user); - - /** - * Given a volume and bucket, return the corresponding DB key. - * @param volume - User name - * @param bucket - Bucket name - */ - byte[] getBucketKey(String volume, String bucket); - - /** - * Given a volume, bucket and a key, return the corresponding DB key. - * @param volume - volume name - * @param bucket - bucket name - * @param key - key name - * @return bytes of DB key. - */ - byte[] getDBKeyBytes(String volume, String bucket, String key); - - /** - * Returns the DB key name of a deleted key in KSM metadata store. - * The name for a deleted key has prefix #deleting# followed by - * the actual key name. - * @param keyName - key name - * @return bytes of DB key. - */ - byte[] getDeletedKeyName(byte[] keyName); - - /** - * Returns the DB key name of a open key in KSM metadata store. - * Should be #open# prefix followed by actual key name. - * @param keyName - key name - * @param id - the id for this open - * @return bytes of DB key. - */ - byte[] getOpenKeyNameBytes(String keyName, int id); - - /** - * Returns the full name of a key given volume name, bucket name and key name. - * Generally done by padding certain delimiters. - * - * @param volumeName - volume name - * @param bucketName - bucket name - * @param keyName - key name - * @return the full key name. - */ - String getKeyWithDBPrefix(String volumeName, String bucketName, - String keyName); - - /** - * Given a volume, check if it is empty, - * i.e there are no buckets inside it. - * @param volume - Volume name - */ - boolean isVolumeEmpty(String volume) throws IOException; - - /** - * Given a volume/bucket, check if it is empty, - * i.e there are no keys inside it. - * @param volume - Volume name - * @param bucket - Bucket name - * @return true if the bucket is empty - */ - boolean isBucketEmpty(String volume, String bucket) throws IOException; - - /** - * Returns a list of buckets represented by {@link KsmBucketInfo} - * in the given volume. - * - * @param volumeName - * the name of the volume. This argument is required, - * this method returns buckets in this given volume. - * @param startBucket - * the start bucket name. Only the buckets whose name is - * after this value will be included in the result. - * This key is excluded from the result. - * @param bucketPrefix - * bucket name prefix. Only the buckets whose name has - * this prefix will be included in the result. - * @param maxNumOfBuckets - * the maximum number of buckets to return. It ensures - * the size of the result will not exceed this limit. - * @return a list of buckets. - * @throws IOException - */ - List<KsmBucketInfo> listBuckets(String volumeName, String startBucket, - String bucketPrefix, int maxNumOfBuckets) throws IOException; - - /** - * Returns a list of keys represented by {@link KsmKeyInfo} - * in the given bucket. - * - * @param volumeName - * the name of the volume. - * @param bucketName - * the name of the bucket. - * @param startKey - * the start key name, only the keys whose name is - * after this value will be included in the result. - * This key is excluded from the result. - * @param keyPrefix - * key name prefix, only the keys whose name has - * this prefix will be included in the result. - * @param maxKeys - * the maximum number of keys to return. It ensures - * the size of the result will not exceed this limit. - * @return a list of keys. - * @throws IOException - */ - List<KsmKeyInfo> listKeys(String volumeName, - String bucketName, String startKey, String keyPrefix, int maxKeys) - throws IOException; - - /** - * Returns a list of volumes owned by a given user; if user is null, - * returns all volumes. - * - * @param userName - * volume owner - * @param prefix - * the volume prefix used to filter the listing result. - * @param startKey - * the start volume name determines where to start listing from, - * this key is excluded from the result. - * @param maxKeys - * the maximum number of volumes to return. - * @return a list of {@link KsmVolumeArgs} - * @throws IOException - */ - List<KsmVolumeArgs> listVolumes(String userName, String prefix, - String startKey, int maxKeys) throws IOException; - - /** - * Returns a list of pending deletion key info that ups to the given count. - * Each entry is a {@link BlockGroup}, which contains the info about the - * key name and all its associated block IDs. A pending deletion key is - * stored with #deleting# prefix in KSM DB. - * - * @param count max number of keys to return. - * @return a list of {@link BlockGroup} represent keys and blocks. - * @throws IOException - */ - List<BlockGroup> getPendingDeletionKeys(int count) throws IOException; - - /** - * Returns a list of all still open key info. Which contains the info about - * the key name and all its associated block IDs. A pending open key has - * prefix #open# in KSM DB. - * - * @return a list of {@link BlockGroup} representing keys and blocks. - * @throws IOException - */ - List<BlockGroup> getExpiredOpenKeys() throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java deleted file mode 100644 index df02182..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java +++ /dev/null @@ -1,524 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.ksm; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; -import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.ksm.exceptions.KSMException; -import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes; -import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo; -import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo; -import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo; -import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataKeyFilters; -import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; -import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; - -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME; -import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR; -import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_DB_CACHE_SIZE_DEFAULT; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_DB_CACHE_SIZE_MB; - -/** - * KSM metadata manager interface. - */ -public class KSMMetadataManagerImpl implements KSMMetadataManager { - - private final MetadataStore store; - private final ReadWriteLock lock; - private final long openKeyExpireThresholdMS; - - public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException { - File metaDir = OzoneUtils.getOzoneMetaDirPath(conf); - final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB, - OZONE_KSM_DB_CACHE_SIZE_DEFAULT); - File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME); - this.store = MetadataStoreBuilder.newBuilder() - .setConf(conf) - .setDbFile(ksmDBFile) - .setCacheSize(cacheSize * OzoneConsts.MB) - .build(); - this.lock = new ReentrantReadWriteLock(); - this.openKeyExpireThresholdMS = 1000 * conf.getInt( - OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, - OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT); - } - - /** - * Start metadata manager. - */ - @Override - public void start() { - - } - - /** - * Stop metadata manager. - */ - @Override - public void stop() throws IOException { - if (store != null) { - store.close(); - } - } - - /** - * Get metadata store. - * @return store - metadata store. - */ - @VisibleForTesting - @Override - public MetadataStore getStore() { - return store; - } - - /** - * Given a volume return the corresponding DB key. - * @param volume - Volume name - */ - public byte[] getVolumeKey(String volume) { - String dbVolumeName = OzoneConsts.KSM_VOLUME_PREFIX + volume; - return DFSUtil.string2Bytes(dbVolumeName); - } - - /** - * Given a user return the corresponding DB key. - * @param user - User name - */ - public byte[] getUserKey(String user) { - String dbUserName = OzoneConsts.KSM_USER_PREFIX + user; - return DFSUtil.string2Bytes(dbUserName); - } - - /** - * Given a volume and bucket, return the corresponding DB key. - * @param volume - User name - * @param bucket - Bucket name - */ - public byte[] getBucketKey(String volume, String bucket) { - String bucketKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume - + OzoneConsts.KSM_BUCKET_PREFIX + bucket; - return DFSUtil.string2Bytes(bucketKeyString); - } - - /** - * @param volume - * @param bucket - * @return - */ - private String getBucketWithDBPrefix(String volume, String bucket) { - StringBuffer sb = new StringBuffer(); - sb.append(OzoneConsts.KSM_VOLUME_PREFIX) - .append(volume) - .append(OzoneConsts.KSM_BUCKET_PREFIX); - if (!Strings.isNullOrEmpty(bucket)) { - sb.append(bucket); - } - return sb.toString(); - } - - @Override - public String getKeyWithDBPrefix(String volume, String bucket, String key) { - String keyVB = OzoneConsts.KSM_KEY_PREFIX + volume - + OzoneConsts.KSM_KEY_PREFIX + bucket - + OzoneConsts.KSM_KEY_PREFIX; - return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key; - } - - @Override - public byte[] getDBKeyBytes(String volume, String bucket, String key) { - return DFSUtil.string2Bytes(getKeyWithDBPrefix(volume, bucket, key)); - } - - @Override - public byte[] getDeletedKeyName(byte[] keyName) { - return DFSUtil.string2Bytes( - DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName)); - } - - @Override - public byte[] getOpenKeyNameBytes(String keyName, int id) { - return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id + - OPEN_KEY_ID_DELIMINATOR + keyName); - } - - /** - * Returns the read lock used on Metadata DB. - * @return readLock - */ - @Override - public Lock readLock() { - return lock.readLock(); - } - - /** - * Returns the write lock used on Metadata DB. - * @return writeLock - */ - @Override - public Lock writeLock() { - return lock.writeLock(); - } - - /** - * Returns the value associated with this key. - * @param key - key - * @return value - */ - @Override - public byte[] get(byte[] key) throws IOException { - return store.get(key); - } - - /** - * Puts a Key into Metadata DB. - * @param key - key - * @param value - value - */ - @Override - public void put(byte[] key, byte[] value) throws IOException { - store.put(key, value); - } - - /** - * Deletes a Key from Metadata DB. - * @param key - key - */ - public void delete(byte[] key) throws IOException { - store.delete(key); - } - - @Override - public void writeBatch(BatchOperation batch) throws IOException { - this.store.writeBatch(batch); - } - - /** - * Given a volume, check if it is empty, i.e there are no buckets inside it. - * @param volume - Volume name - * @return true if the volume is empty - */ - public boolean isVolumeEmpty(String volume) throws IOException { - String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume - + OzoneConsts.KSM_BUCKET_PREFIX; - byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName); - ImmutablePair<byte[], byte[]> volumeRoot = - store.peekAround(0, dbVolumeRootKey); - if (volumeRoot != null) { - return !DFSUtil.bytes2String(volumeRoot.getKey()) - .startsWith(dbVolumeRootName); - } - return true; - } - - /** - * Given a volume/bucket, check if it is empty, - * i.e there are no keys inside it. - * @param volume - Volume name - * @param bucket - Bucket name - * @return true if the bucket is empty - */ - public boolean isBucketEmpty(String volume, String bucket) - throws IOException { - String keyRootName = getKeyWithDBPrefix(volume, bucket, null); - byte[] keyRoot = DFSUtil.string2Bytes(keyRootName); - ImmutablePair<byte[], byte[]> firstKey = store.peekAround(0, keyRoot); - if (firstKey != null) { - return !DFSUtil.bytes2String(firstKey.getKey()) - .startsWith(keyRootName); - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public List<KsmBucketInfo> listBuckets(final String volumeName, - final String startBucket, final String bucketPrefix, - final int maxNumOfBuckets) throws IOException { - List<KsmBucketInfo> result = new ArrayList<>(); - if (Strings.isNullOrEmpty(volumeName)) { - throw new KSMException("Volume name is required.", - ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - - byte[] volumeNameBytes = getVolumeKey(volumeName); - if (store.get(volumeNameBytes) == null) { - throw new KSMException("Volume " + volumeName + " not found.", - ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - - - // A bucket starts with /#volume/#bucket_prefix - MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> { - if (currentKey != null) { - String bucketNamePrefix = - getBucketWithDBPrefix(volumeName, bucketPrefix); - String bucket = DFSUtil.bytes2String(currentKey); - return bucket.startsWith(bucketNamePrefix); - } - return false; - }; - - List<Map.Entry<byte[], byte[]>> rangeResult; - if (!Strings.isNullOrEmpty(startBucket)) { - // Since we are excluding start key from the result, - // the maxNumOfBuckets is incremented. - rangeResult = store.getSequentialRangeKVs( - getBucketKey(volumeName, startBucket), - maxNumOfBuckets + 1, filter); - if (!rangeResult.isEmpty()) { - //Remove start key from result. - rangeResult.remove(0); - } - } else { - rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter); - } - - for (Map.Entry<byte[], byte[]> entry : rangeResult) { - KsmBucketInfo info = KsmBucketInfo.getFromProtobuf( - BucketInfo.parseFrom(entry.getValue())); - result.add(info); - } - return result; - } - - @Override - public List<KsmKeyInfo> listKeys(String volumeName, String bucketName, - String startKey, String keyPrefix, int maxKeys) throws IOException { - List<KsmKeyInfo> result = new ArrayList<>(); - if (Strings.isNullOrEmpty(volumeName)) { - throw new KSMException("Volume name is required.", - ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - - if (Strings.isNullOrEmpty(bucketName)) { - throw new KSMException("Bucket name is required.", - ResultCodes.FAILED_BUCKET_NOT_FOUND); - } - - byte[] bucketNameBytes = getBucketKey(volumeName, bucketName); - if (store.get(bucketNameBytes) == null) { - throw new KSMException("Bucket " + bucketName + " not found.", - ResultCodes.FAILED_BUCKET_NOT_FOUND); - } - - MetadataKeyFilter filter = new KeyPrefixFilter( - getKeyWithDBPrefix(volumeName, bucketName, keyPrefix)); - - List<Map.Entry<byte[], byte[]>> rangeResult; - if (!Strings.isNullOrEmpty(startKey)) { - //Since we are excluding start key from the result, - // the maxNumOfBuckets is incremented. - rangeResult = store.getSequentialRangeKVs( - getDBKeyBytes(volumeName, bucketName, startKey), - maxKeys + 1, filter); - if (!rangeResult.isEmpty()) { - //Remove start key from result. - rangeResult.remove(0); - } - } else { - rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter); - } - - for (Map.Entry<byte[], byte[]> entry : rangeResult) { - KsmKeyInfo info = KsmKeyInfo.getFromProtobuf( - KeyInfo.parseFrom(entry.getValue())); - result.add(info); - } - return result; - } - - @Override - public List<KsmVolumeArgs> listVolumes(String userName, - String prefix, String startKey, int maxKeys) throws IOException { - List<KsmVolumeArgs> result = Lists.newArrayList(); - VolumeList volumes; - if (Strings.isNullOrEmpty(userName)) { - volumes = getAllVolumes(); - } else { - volumes = getVolumesByUser(userName); - } - - if (volumes == null || volumes.getVolumeNamesCount() == 0) { - return result; - } - - boolean startKeyFound = Strings.isNullOrEmpty(startKey); - for (String volumeName : volumes.getVolumeNamesList()) { - if (!Strings.isNullOrEmpty(prefix)) { - if (!volumeName.startsWith(prefix)) { - continue; - } - } - - if (!startKeyFound && volumeName.equals(startKey)) { - startKeyFound = true; - continue; - } - if (startKeyFound && result.size() < maxKeys) { - byte[] volumeInfo = store.get(this.getVolumeKey(volumeName)); - if (volumeInfo == null) { - // Could not get volume info by given volume name, - // since the volume name is loaded from db, - // this probably means ksm db is corrupted or some entries are - // accidentally removed. - throw new KSMException("Volume info not found for " + volumeName, - ResultCodes.FAILED_VOLUME_NOT_FOUND); - } - VolumeInfo info = VolumeInfo.parseFrom(volumeInfo); - KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(info); - result.add(volumeArgs); - } - } - - return result; - } - - private VolumeList getVolumesByUser(String userName) - throws KSMException { - return getVolumesByUser(getUserKey(userName)); - } - - private VolumeList getVolumesByUser(byte[] userNameKey) - throws KSMException { - VolumeList volumes = null; - try { - byte[] volumesInBytes = store.get(userNameKey); - if (volumesInBytes == null) { - // No volume found for this user, return an empty list - return VolumeList.newBuilder().build(); - } - volumes = VolumeList.parseFrom(volumesInBytes); - } catch (IOException e) { - throw new KSMException("Unable to get volumes info by the given user, " - + "metadata might be corrupted", e, - ResultCodes.FAILED_METADATA_ERROR); - } - return volumes; - } - - private VolumeList getAllVolumes() throws IOException { - // Scan all users in database - KeyPrefixFilter filter = new KeyPrefixFilter(OzoneConsts.KSM_USER_PREFIX); - // We are not expecting a huge number of users per cluster, - // it should be fine to scan all users in db and return us a - // list of volume names in string per user. - List<Map.Entry<byte[], byte[]>> rangeKVs = store - .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter); - - VolumeList.Builder builder = VolumeList.newBuilder(); - for (Map.Entry<byte[], byte[]> entry : rangeKVs) { - VolumeList volumes = this.getVolumesByUser(entry.getKey()); - builder.addAllVolumeNames(volumes.getVolumeNamesList()); - } - - return builder.build(); - } - - @Override - public List<BlockGroup> getPendingDeletionKeys(final int count) - throws IOException { - List<BlockGroup> keyBlocksList = Lists.newArrayList(); - List<Map.Entry<byte[], byte[]>> rangeResult = - store.getRangeKVs(null, count, - MetadataKeyFilters.getDeletingKeyFilter()); - for (Map.Entry<byte[], byte[]> entry : rangeResult) { - KsmKeyInfo info = - KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); - // Get block keys as a list. - KsmKeyLocationInfoGroup latest = info.getLatestVersionLocations(); - if (latest == null) { - return Collections.emptyList(); - } - List<String> item = latest.getLocationList().stream() - .map(KsmKeyLocationInfo::getBlockID) - .collect(Collectors.toList()); - BlockGroup keyBlocks = BlockGroup.newBuilder() - .setKeyName(DFSUtil.bytes2String(entry.getKey())) - .addAllBlockIDs(item) - .build(); - keyBlocksList.add(keyBlocks); - } - return keyBlocksList; - } - - @Override - public List<BlockGroup> getExpiredOpenKeys() throws IOException { - List<BlockGroup> keyBlocksList = Lists.newArrayList(); - long now = Time.now(); - final MetadataKeyFilter openKeyFilter = - new KeyPrefixFilter(OPEN_KEY_PREFIX); - List<Map.Entry<byte[], byte[]>> rangeResult = - store.getSequentialRangeKVs(null, Integer.MAX_VALUE, - openKeyFilter); - for (Map.Entry<byte[], byte[]> entry : rangeResult) { - KsmKeyInfo info = - KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); - long lastModify = info.getModificationTime(); - if (now - lastModify < this.openKeyExpireThresholdMS) { - // consider as may still be active, not hanging. - continue; - } - // Get block keys as a list. - List<String> item = info.getLatestVersionLocations() - .getBlocksLatestVersionOnly().stream() - .map(KsmKeyLocationInfo::getBlockID) - .collect(Collectors.toList()); - BlockGroup keyBlocks = BlockGroup.newBuilder() - .setKeyName(DFSUtil.bytes2String(entry.getKey())) - .addAllBlockIDs(item) - .build(); - keyBlocksList.add(keyBlocks); - } - return keyBlocksList; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java deleted file mode 100644 index bd29012..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java +++ /dev/null @@ -1,437 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.ksm; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; - -/** - * This class is for maintaining KeySpaceManager statistics. - */ -@InterfaceAudience.Private -@Metrics(about="Key Space Manager Metrics", context="dfs") -public class KSMMetrics { - private static final String SOURCE_NAME = - KSMMetrics.class.getSimpleName(); - - // KSM request type op metrics - private @Metric MutableCounterLong numVolumeOps; - private @Metric MutableCounterLong numBucketOps; - private @Metric MutableCounterLong numKeyOps; - - // KSM op metrics - private @Metric MutableCounterLong numVolumeCreates; - private @Metric MutableCounterLong numVolumeUpdates; - private @Metric MutableCounterLong numVolumeInfos; - private @Metric MutableCounterLong numVolumeCheckAccesses; - private @Metric MutableCounterLong numBucketCreates; - private @Metric MutableCounterLong numVolumeDeletes; - private @Metric MutableCounterLong numBucketInfos; - private @Metric MutableCounterLong numBucketUpdates; - private @Metric MutableCounterLong numBucketDeletes; - private @Metric MutableCounterLong numKeyAllocate; - private @Metric MutableCounterLong numKeyLookup; - private @Metric MutableCounterLong numKeyDeletes; - private @Metric MutableCounterLong numBucketLists; - private @Metric MutableCounterLong numKeyLists; - private @Metric MutableCounterLong numVolumeLists; - private @Metric MutableCounterLong numKeyCommits; - private @Metric MutableCounterLong numAllocateBlockCalls; - private @Metric MutableCounterLong numGetServiceLists; - - // Failure Metrics - private @Metric MutableCounterLong numVolumeCreateFails; - private @Metric MutableCounterLong numVolumeUpdateFails; - private @Metric MutableCounterLong numVolumeInfoFails; - private @Metric MutableCounterLong numVolumeDeleteFails; - private @Metric MutableCounterLong numBucketCreateFails; - private @Metric MutableCounterLong numVolumeCheckAccessFails; - private @Metric MutableCounterLong numBucketInfoFails; - private @Metric MutableCounterLong numBucketUpdateFails; - private @Metric MutableCounterLong numBucketDeleteFails; - private @Metric MutableCounterLong numKeyAllocateFails; - private @Metric MutableCounterLong numKeyLookupFails; - private @Metric MutableCounterLong numKeyDeleteFails; - private @Metric MutableCounterLong numBucketListFails; - private @Metric MutableCounterLong numKeyListFails; - private @Metric MutableCounterLong numVolumeListFails; - private @Metric MutableCounterLong numKeyCommitFails; - private @Metric MutableCounterLong numBlockAllocateCallFails; - private @Metric MutableCounterLong numGetServiceListFails; - - public KSMMetrics() { - } - - public static KSMMetrics create() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register(SOURCE_NAME, - "Key Space Manager Metrics", - new KSMMetrics()); - } - - public void incNumVolumeCreates() { - numVolumeOps.incr(); - numVolumeCreates.incr(); - } - - public void incNumVolumeUpdates() { - numVolumeOps.incr(); - numVolumeUpdates.incr(); - } - - public void incNumVolumeInfos() { - numVolumeOps.incr(); - numVolumeInfos.incr(); - } - - public void incNumVolumeDeletes() { - numVolumeOps.incr(); - numVolumeDeletes.incr(); - } - - public void incNumVolumeCheckAccesses() { - numVolumeOps.incr(); - numVolumeCheckAccesses.incr(); - } - - public void incNumBucketCreates() { - numBucketOps.incr(); - numBucketCreates.incr(); - } - - public void incNumBucketInfos() { - numBucketOps.incr(); - numBucketInfos.incr(); - } - - public void incNumBucketUpdates() { - numBucketOps.incr(); - numBucketUpdates.incr(); - } - - public void incNumBucketDeletes() { - numBucketOps.incr(); - numBucketDeletes.incr(); - } - - public void incNumBucketLists() { - numBucketOps.incr(); - numBucketLists.incr(); - } - - public void incNumKeyLists() { - numKeyOps.incr(); - numKeyLists.incr(); - } - - public void incNumVolumeLists() { - numVolumeOps.incr(); - numVolumeLists.incr(); - } - - public void incNumGetServiceLists() { - numGetServiceLists.incr(); - } - - public void incNumVolumeCreateFails() { - numVolumeCreateFails.incr(); - } - - public void incNumVolumeUpdateFails() { - numVolumeUpdateFails.incr(); - } - - public void incNumVolumeInfoFails() { - numVolumeInfoFails.incr(); - } - - public void incNumVolumeDeleteFails() { - numVolumeDeleteFails.incr(); - } - - public void incNumVolumeCheckAccessFails() { - numVolumeCheckAccessFails.incr(); - } - - public void incNumBucketCreateFails() { - numBucketCreateFails.incr(); - } - - public void incNumBucketInfoFails() { - numBucketInfoFails.incr(); - } - - public void incNumBucketUpdateFails() { - numBucketUpdateFails.incr(); - } - - public void incNumBucketDeleteFails() { - numBucketDeleteFails.incr(); - } - - public void incNumKeyAllocates() { - numKeyOps.incr(); - numKeyAllocate.incr(); - } - - public void incNumKeyAllocateFails() { - numKeyAllocateFails.incr(); - } - - public void incNumKeyLookups() { - numKeyOps.incr(); - numKeyLookup.incr(); - } - - public void incNumKeyLookupFails() { - numKeyLookupFails.incr(); - } - - public void incNumKeyDeleteFails() { - numKeyDeleteFails.incr(); - } - - public void incNumKeyDeletes() { - numKeyOps.incr(); - numKeyDeletes.incr(); - } - - public void incNumKeyCommits() { - numKeyOps.incr(); - numKeyCommits.incr(); - } - - public void incNumKeyCommitFails() { - numKeyCommitFails.incr(); - } - - public void incNumBlockAllocateCalls() { - numAllocateBlockCalls.incr(); - } - - public void incNumBlockAllocateCallFails() { - numBlockAllocateCallFails.incr(); - } - - public void incNumBucketListFails() { - numBucketListFails.incr(); - } - - public void incNumKeyListFails() { - numKeyListFails.incr(); - } - - public void incNumVolumeListFails() { - numVolumeListFails.incr(); - } - - public void incNumGetServiceListFails() { - numGetServiceListFails.incr(); - } - - @VisibleForTesting - public long getNumVolumeCreates() { - return numVolumeCreates.value(); - } - - @VisibleForTesting - public long getNumVolumeUpdates() { - return numVolumeUpdates.value(); - } - - @VisibleForTesting - public long getNumVolumeInfos() { - return numVolumeInfos.value(); - } - - @VisibleForTesting - public long getNumVolumeDeletes() { - return numVolumeDeletes.value(); - } - - @VisibleForTesting - public long getNumVolumeCheckAccesses() { - return numVolumeCheckAccesses.value(); - } - - @VisibleForTesting - public long getNumBucketCreates() { - return numBucketCreates.value(); - } - - @VisibleForTesting - public long getNumBucketInfos() { - return numBucketInfos.value(); - } - - @VisibleForTesting - public long getNumBucketUpdates() { - return numBucketUpdates.value(); - } - - @VisibleForTesting - public long getNumBucketDeletes() { - return numBucketDeletes.value(); - } - - @VisibleForTesting - public long getNumBucketLists() { - return numBucketLists.value(); - } - - @VisibleForTesting - public long getNumVolumeLists() { - return numVolumeLists.value(); - } - - @VisibleForTesting - public long getNumKeyLists() { - return numKeyLists.value(); - } - - @VisibleForTesting - public long getNumGetServiceLists() { - return numGetServiceLists.value(); - } - - @VisibleForTesting - public long getNumVolumeCreateFails() { - return numVolumeCreateFails.value(); - } - - @VisibleForTesting - public long getNumVolumeUpdateFails() { - return numVolumeUpdateFails.value(); - } - - @VisibleForTesting - public long getNumVolumeInfoFails() { - return numVolumeInfoFails.value(); - } - - @VisibleForTesting - public long getNumVolumeDeleteFails() { - return numVolumeDeleteFails.value(); - } - - @VisibleForTesting - public long getNumVolumeCheckAccessFails() { - return numVolumeCheckAccessFails.value(); - } - - @VisibleForTesting - public long getNumBucketCreateFails() { - return numBucketCreateFails.value(); - } - - @VisibleForTesting - public long getNumBucketInfoFails() { - return numBucketInfoFails.value(); - } - - @VisibleForTesting - public long getNumBucketUpdateFails() { - return numBucketUpdateFails.value(); - } - - @VisibleForTesting - public long getNumBucketDeleteFails() { - return numBucketDeleteFails.value(); - } - - @VisibleForTesting - public long getNumKeyAllocates() { - return numKeyAllocate.value(); - } - - @VisibleForTesting - public long getNumKeyAllocateFails() { - return numKeyAllocateFails.value(); - } - - @VisibleForTesting - public long getNumKeyLookups() { - return numKeyLookup.value(); - } - - @VisibleForTesting - public long getNumKeyLookupFails() { - return numKeyLookupFails.value(); - } - - @VisibleForTesting - public long getNumKeyDeletes() { - return numKeyDeletes.value(); - } - - @VisibleForTesting - public long getNumKeyDeletesFails() { - return numKeyDeleteFails.value(); - } - - @VisibleForTesting - public long getNumBucketListFails() { - return numBucketListFails.value(); - } - - @VisibleForTesting - public long getNumKeyListFails() { - return numKeyListFails.value(); - } - - @VisibleForTesting - public long getNumVolumeListFails() { - return numVolumeListFails.value(); - } - - @VisibleForTesting - public long getNumKeyCommits() { - return numKeyCommits.value(); - } - - @VisibleForTesting - public long getNumKeyCommitFails() { - return numKeyCommitFails.value(); - } - - @VisibleForTesting - public long getNumBlockAllocates() { - return numAllocateBlockCalls.value(); - } - - @VisibleForTesting - public long getNumBlockAllocateFails() { - return numBlockAllocateCallFails.value(); - } - - @VisibleForTesting - public long getNumGetServiceListFails() { - return numGetServiceListFails.value(); - } - - public void unRegister() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - ms.unregisterSource(SOURCE_NAME); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java deleted file mode 100644 index 8219212..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMStorage.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.ksm; - -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.common.Storage; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType; -import org.apache.hadoop.ozone.scm.SCMStorage; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; - -import java.io.IOException; -import java.util.Properties; -import java.util.UUID; - -/** - * KSMStorage is responsible for management of the StorageDirectories used by - * the KSM. - */ -public class KSMStorage extends Storage { - - public static final String STORAGE_DIR = "ksm"; - public static final String KSM_ID = "ksmUuid"; - - /** - * Construct KSMStorage. - * @throws IOException if any directories are inaccessible. - */ - public KSMStorage(OzoneConfiguration conf) throws IOException { - super(NodeType.KSM, OzoneUtils.getOzoneMetaDirPath(conf), STORAGE_DIR); - } - - public void setScmId(String scmId) throws IOException { - if (getState() == StorageState.INITIALIZED) { - throw new IOException("KSM is already initialized."); - } else { - getStorageInfo().setProperty(SCMStorage.SCM_ID, scmId); - } - } - - public void setKsmId(String ksmId) throws IOException { - if (getState() == StorageState.INITIALIZED) { - throw new IOException("KSM is already initialized."); - } else { - getStorageInfo().setProperty(KSM_ID, ksmId); - } - } - - /** - * Retrieves the SCM ID from the version file. - * @return SCM_ID - */ - public String getScmId() { - return getStorageInfo().getProperty(SCMStorage.SCM_ID); - } - - /** - * Retrieves the KSM ID from the version file. - * @return KSM_ID - */ - public String getKsmId() { - return getStorageInfo().getProperty(KSM_ID); - } - - @Override - protected Properties getNodeProperties() { - String ksmId = getKsmId(); - if (ksmId == null) { - ksmId = UUID.randomUUID().toString(); - } - Properties ksmProperties = new Properties(); - ksmProperties.setProperty(KSM_ID, ksmId); - return ksmProperties; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java deleted file mode 100644 index 3beaed4..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.ksm; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BackgroundService; -import org.apache.hadoop.utils.BackgroundTask; -import org.apache.hadoop.utils.BackgroundTaskQueue; -import org.apache.hadoop.utils.BackgroundTaskResult; -import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; - -/** - * This is the background service to delete keys. - * Scan the metadata of ksm periodically to get - * the keys with prefix "#deleting" and ask scm to - * delete metadata accordingly, if scm returns - * success for keys, then clean up those keys. - */ -public class KeyDeletingService extends BackgroundService { - - private static final Logger LOG = - LoggerFactory.getLogger(KeyDeletingService.class); - - // The thread pool size for key deleting service. - private final static int KEY_DELETING_CORE_POOL_SIZE = 2; - - private final ScmBlockLocationProtocol scmClient; - private final KeyManager manager; - private final int keyLimitPerTask; - - public KeyDeletingService(ScmBlockLocationProtocol scmClient, - KeyManager manager, long serviceInterval, - long serviceTimeout, Configuration conf) { - super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS, - KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); - this.scmClient = scmClient; - this.manager = manager; - this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, - OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); - } - - @Override - public BackgroundTaskQueue getTasks() { - BackgroundTaskQueue queue = new BackgroundTaskQueue(); - queue.add(new KeyDeletingTask()); - return queue; - } - - /** - * A key deleting task scans KSM DB and looking for a certain number - * of pending-deletion keys, sends these keys along with their associated - * blocks to SCM for deletion. Once SCM confirms keys are deleted (once - * SCM persisted the blocks info in its deletedBlockLog), it removes - * these keys from the DB. - */ - private class KeyDeletingTask implements - BackgroundTask<BackgroundTaskResult> { - - @Override - public int getPriority() { - return 0; - } - - @Override - public BackgroundTaskResult call() throws Exception { - try { - long startTime = Time.monotonicNow(); - List<BlockGroup> keyBlocksList = manager - .getPendingDeletionKeys(keyLimitPerTask); - if (keyBlocksList.size() > 0) { - LOG.info("Found {} to-delete keys in KSM", keyBlocksList.size()); - List<DeleteBlockGroupResult> results = - scmClient.deleteKeyBlocks(keyBlocksList); - for (DeleteBlockGroupResult result : results) { - if (result.isSuccess()) { - try { - // Purge key from KSM DB. - manager.deletePendingDeletionKey(result.getObjectKey()); - LOG.debug("Key {} deleted from KSM DB", result.getObjectKey()); - } catch (IOException e) { - // if a pending deletion key is failed to delete, - // print a warning here and retain it in this state, - // so that it can be attempt to delete next time. - LOG.warn("Failed to delete pending-deletion key {}", - result.getObjectKey(), e); - } - } else { - // Key deletion failed, retry in next interval. - LOG.warn("Key {} deletion failed because some of the blocks" - + " were failed to delete, failed blocks: {}", - result.getObjectKey(), - String.join(",", result.getFailedBlocks())); - } - } - - if (!results.isEmpty()) { - LOG.info("Number of key deleted from KSM DB: {}," - + " task elapsed time: {}ms", - results.size(), Time.monotonicNow() - startTime); - } - - return results::size; - } else { - LOG.debug("No pending deletion key found in KSM"); - } - } catch (IOException e) { - LOG.error("Unable to get pending deletion keys, retry in" - + " next interval", e); - } - return EmptyTaskResult.newResult(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java deleted file mode 100644 index e71ce5f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.ksm; - -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; -import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession; - -import java.io.IOException; -import java.util.List; - -/** - * Handles key level commands. - */ -public interface KeyManager { - - /** - * Start key manager. - */ - void start(); - - /** - * Stop key manager. - */ - void stop() throws IOException; - - /** - * After calling commit, the key will be made visible. There can be multiple - * open key writes in parallel (identified by client id). The most recently - * committed one will be the one visible. - * - * @param args the key to commit. - * @param clientID the client that is committing. - * @throws IOException - */ - void commitKey(KsmKeyArgs args, int clientID) throws IOException; - - /** - * A client calls this on an open key, to request to allocate a new block, - * and appended to the tail of current block list of the open client. - * - * @param args the key to append - * @param clientID the client requesting block. - * @return the reference to the new block. - * @throws IOException - */ - KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID) - throws IOException; - /** - * Given the args of a key to put, write an open key entry to meta data. - * - * In case that the container creation or key write failed on - * DistributedStorageHandler, this key's metadata will still stay in KSM. - * TODO garbage collect the open keys that never get closed - * - * @param args the args of the key provided by client. - * @return a OpenKeySession instance client uses to talk to container. - * @throws Exception - */ - OpenKeySession openKey(KsmKeyArgs args) throws IOException; - - /** - * Look up an existing key. Return the info of the key to client side, which - * DistributedStorageHandler will use to access the data on datanode. - * - * @param args the args of the key provided by client. - * @return a KsmKeyInfo instance client uses to talk to container. - * @throws IOException - */ - KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException; - - /** - * Deletes an object by an object key. The key will be immediately removed - * from KSM namespace and become invisible to clients. The object data - * will be removed in async manner that might retain for some time. - * - * @param args the args of the key provided by client. - * @throws IOException if specified key doesn't exist or - * some other I/O errors while deleting an object. - */ - void deleteKey(KsmKeyArgs args) throws IOException; - - /** - * Returns a list of keys represented by {@link KsmKeyInfo} - * in the given bucket. - * - * @param volumeName - * the name of the volume. - * @param bucketName - * the name of the bucket. - * @param startKey - * the start key name, only the keys whose name is - * after this value will be included in the result. - * This key is excluded from the result. - * @param keyPrefix - * key name prefix, only the keys whose name has - * this prefix will be included in the result. - * @param maxKeys - * the maximum number of keys to return. It ensures - * the size of the result will not exceed this limit. - * @return a list of keys. - * @throws IOException - */ - List<KsmKeyInfo> listKeys(String volumeName, - String bucketName, String startKey, String keyPrefix, int maxKeys) - throws IOException; - - /** - * Returns a list of pending deletion key info that ups to the given count. - * Each entry is a {@link BlockGroup}, which contains the info about the - * key name and all its associated block IDs. A pending deletion key is - * stored with #deleting# prefix in KSM DB. - * - * @param count max number of keys to return. - * @return a list of {@link BlockGroup} representing keys and blocks. - * @throws IOException - */ - List<BlockGroup> getPendingDeletionKeys(int count) throws IOException; - - /** - * Deletes a pending deletion key by its name. This is often called when - * key can be safely deleted from this layer. Once called, all footprints - * of the key will be purged from KSM DB. - * - * @param objectKeyName object key name with #deleting# prefix. - * @throws IOException if specified key doesn't exist or other I/O errors. - */ - void deletePendingDeletionKey(String objectKeyName) throws IOException; - - /** - * Returns a list of all still open key info. Which contains the info about - * the key name and all its associated block IDs. A pending open key has - * prefix #open# in KSM DB. - * - * @return a list of {@link BlockGroup} representing keys and blocks. - * @throws IOException - */ - List<BlockGroup> getExpiredOpenKeys() throws IOException; - - /** - * Deletes a expired open key by its name. Called when a hanging key has been - * lingering for too long. Once called, the open key entries gets removed - * from KSM mdata data. - * - * @param objectKeyName object key name with #open# prefix. - * @throws IOException if specified key doesn't exist or other I/O errors. - */ - void deleteExpiredOpenKey(String objectKeyName) throws IOException; -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org