http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java deleted file mode 100644 index 1fe9a18..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java +++ /dev/null @@ -1,1138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.localstorage; - -import com.google.common.base.Preconditions; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.client.io.LengthInputStream; -import org.apache.hadoop.ozone.web.exceptions.ErrorTable; -import org.apache.hadoop.ozone.client.rest.OzoneException; -import org.apache.hadoop.ozone.web.handlers.BucketArgs; -import org.apache.hadoop.ozone.web.handlers.KeyArgs; -import org.apache.hadoop.ozone.web.handlers.ListArgs; -import org.apache.hadoop.ozone.web.handlers.UserArgs; -import org.apache.hadoop.ozone.web.handlers.VolumeArgs; -import org.apache.hadoop.ozone.OzoneAcl; -import org.apache.hadoop.ozone.web.response.BucketInfo; -import org.apache.hadoop.ozone.web.response.KeyInfo; -import org.apache.hadoop.ozone.web.response.ListBuckets; -import org.apache.hadoop.ozone.web.response.ListKeys; -import org.apache.hadoop.ozone.web.response.ListVolumes; -import org.apache.hadoop.ozone.web.response.VolumeInfo; -import org.apache.hadoop.ozone.web.response.VolumeOwner; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.Charset; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; -import java.util.ListIterator; -import java.util.Locale; -import java.util.TimeZone; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * A stand alone Ozone implementation that allows us to run Ozone tests in local - * mode. This acts as the ozone backend when using MiniDFSCluster for testing. - */ -public final class OzoneMetadataManager { - - /* - OzoneMetadataManager manages volume/bucket/object metadata and - data. - - Metadata is maintained in 2 level DB files, UserDB and MetadataDB. - - UserDB contains a Name and a List. For example volumes owned by the user - bilbo, would be maintained in UserDB as {bilbo}->{shire, rings} - - This list part of mapping is context sensitive. That is, if you use {user - name} as the key, the list you get is a list of volumes. if you use - {user/volume} as the key the list you get is list of buckets. if you use - {user/volume/bucket} as key the list you get is the list of objects. - - All keys in the UserDB starts with the UserName. - - We also need to maintain a flat namespace for volumes. This is - maintained by the MetadataDB. MetadataDB contains the name of an - object(volume, bucket or key) and its associated metadata. - The keys in the Metadata DB are {volume}, {volume/bucket} or - {volume/bucket/key}. User name is absent, so we have a common root name - space for the volume. - - The value of part of metadataDB points to corresponding *Info structures. - {volume] -> volumeInfo - {volume/bucket} -> bucketInfo - {volume/bucket/key} -> keyInfo - - - Here are various work flows : - - CreateVolume -> Check if Volume exists in metadataDB, if not update UserDB - with a list of volumes and update metadataDB with VolumeInfo. - - DeleteVolume -> Check the Volume, and check the VolumeInfo->bucketCount. - if bucketCount == 0, delete volume from userDB->{List of volumes} and - metadataDB. - - Very similar work flows exist for CreateBucket and DeleteBucket. - - // Please note : These database operations are *not* transactional, - // which means that failure can lead to inconsistencies. - // Only way to recover is to reset to a clean state, or - // use rm -rf /tmp/ozone :) - - We have very simple locking policy. We have a ReaderWriter lock that is - taken for each action, this lock is aptly named "lock". - - All actions *must* be performed with a lock held, either a read - lock or a write lock. Violation of these locking policies can be harmful. - - - // // IMPORTANT : - // // This is a simulation layer, this is NOT how the real - // // OZONE functions. This is written to so that we can write - // // stand-alone tests for the protocol and client code. - -*/ - static final Logger LOG = LoggerFactory.getLogger(OzoneMetadataManager.class); - private static final String USER_DB = "/user.db"; - private static final String META_DB = "/metadata.db"; - private static OzoneMetadataManager bm = null; - private MetadataStore userDB; - private MetadataStore metadataDB; - private ReadWriteLock lock; - private Charset encoding = Charset.forName("UTF-8"); - private String storageRoot; - private static final String OBJECT_DIR = "/_objects/"; - - // This table keeps a pointer to objects whose operations - // are in progress but not yet committed to persistent store - private ConcurrentHashMap<OutputStream, String> inProgressObjects; - - /** - * Constructs OzoneMetadataManager. - */ - private OzoneMetadataManager(Configuration conf) throws IOException { - - lock = new ReentrantReadWriteLock(); - storageRoot = - conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, - OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT); - - File file = new File(storageRoot + OBJECT_DIR); - - if (!file.exists() && !file.mkdirs()) { - LOG.error("Creation of Ozone root failed. " + file.toString()); - throw new IOException("Creation of Ozone root failed."); - } - - try { - userDB = MetadataStoreBuilder.newBuilder() - .setDbFile(new File(storageRoot + USER_DB)) - .setCreateIfMissing(true) - .build(); - metadataDB = MetadataStoreBuilder.newBuilder() - .setDbFile(new File(storageRoot + META_DB)) - .setCreateIfMissing(true) - .build(); - inProgressObjects = new ConcurrentHashMap<>(); - } catch (IOException ex) { - LOG.error("Cannot open db :" + ex.getMessage()); - throw ex; - } - } - - /** - * Gets Ozone Manager. - * - * @return OzoneMetadataManager - */ - public static synchronized OzoneMetadataManager - getOzoneMetadataManager(Configuration conf) throws IOException { - if (bm == null) { - bm = new OzoneMetadataManager(conf); - } - return bm; - } - - /** - * Creates a volume. - * - * @param args - VolumeArgs - * @throws OzoneException - */ - public void createVolume(VolumeArgs args) throws OzoneException { - lock.writeLock().lock(); - try { - SimpleDateFormat format = - new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US); - format.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE)); - - byte[] volumeName = - metadataDB.get(args.getVolumeName().getBytes(encoding)); - - if (volumeName != null) { - LOG.debug("Volume {} already exists.", volumeName); - throw ErrorTable.newError(ErrorTable.VOLUME_ALREADY_EXISTS, args); - } - - VolumeInfo newVInfo = new VolumeInfo(args.getVolumeName(), format - .format(new Date(System.currentTimeMillis())), args.getAdminName()); - - newVInfo.setQuota(args.getQuota()); - VolumeOwner owner = new VolumeOwner(args.getUserName()); - newVInfo.setOwner(owner); - - ListVolumes volumeList; - byte[] userVolumes = userDB.get(args.getUserName().getBytes(encoding)); - if (userVolumes == null) { - volumeList = new ListVolumes(); - } else { - volumeList = ListVolumes.parse(new String(userVolumes, encoding)); - } - - volumeList.addVolume(newVInfo); - volumeList.sort(); - - // Please note : These database operations are *not* transactional, - // which means that failure can lead to inconsistencies. - // Only way to recover is to reset to a clean state, or - // use rm -rf /tmp/ozone :) - - - userDB.put(args.getUserName().getBytes(encoding), - volumeList.toDBString().getBytes(encoding)); - - metadataDB.put(args.getVolumeName().getBytes(encoding), - newVInfo.toDBString().getBytes(encoding)); - - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Updates the Volume properties like Owner Name and Quota. - * - * @param args - Volume Args - * @param property - Flag which tells us what property to upgrade - * @throws OzoneException - */ - public void setVolumeProperty(VolumeArgs args, VolumeProperty property) - throws OzoneException { - lock.writeLock().lock(); - try { - byte[] volumeInfo = - metadataDB.get(args.getVolumeName().getBytes(encoding)); - if (volumeInfo == null) { - throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, args); - } - VolumeInfo info = VolumeInfo.parse(new String(volumeInfo, encoding)); - - byte[] userBytes = userDB.get(args.getResourceName().getBytes(encoding)); - ListVolumes volumeList; - if (userBytes == null) { - volumeList = new ListVolumes(); - } else { - volumeList = ListVolumes.parse(new String(userBytes, encoding)); - } - - switch (property) { - case OWNER: - // needs new owner, we delete the volume object from the - // old user's volume list - removeOldOwner(info); - VolumeOwner owner = new VolumeOwner(args.getUserName()); - // set the new owner - info.setOwner(owner); - break; - case QUOTA: - // if this is quota update we just remove the old object from the - // current users list and update the same object later. - volumeList.getVolumes().remove(info); - info.setQuota(args.getQuota()); - break; - default: - OzoneException ozEx = - ErrorTable.newError(ErrorTable.BAD_PROPERTY, args); - ozEx.setMessage("Volume property is not recognized"); - throw ozEx; - } - - volumeList.addVolume(info); - - metadataDB.put(args.getVolumeName().getBytes(encoding), - info.toDBString().getBytes(encoding)); - - // if this is an owner change this put will create a new owner or update - // the owner's volume list. - userDB.put(args.getResourceName().getBytes(encoding), - volumeList.toDBString().getBytes(encoding)); - - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Removes the old owner from the volume. - * - * @param info - VolumeInfo - * @throws IOException - */ - private void removeOldOwner(VolumeInfo info) throws IOException { - // We need to look the owner that we know is the current owner - byte[] volumeBytes = - userDB.get(info.getOwner().getName().getBytes(encoding)); - ListVolumes volumeList = - ListVolumes.parse(new String(volumeBytes, encoding)); - volumeList.getVolumes().remove(info); - - // Write the new list info to the old user data - userDB.put(info.getOwner().getName().getBytes(encoding), - volumeList.toDBString().getBytes(encoding)); - } - - /** - * Checks if you are the owner of a specific volume. - * - * @param volume - Volume Name whose access permissions needs to be checked - * @param acl - requested acls which needs to be checked for access - * @return - True if you are the owner, false otherwise - * @throws OzoneException - */ - public boolean checkVolumeAccess(String volume, OzoneAcl acl) - throws OzoneException { - lock.readLock().lock(); - try { - byte[] volumeInfo = - metadataDB.get(volume.getBytes(encoding)); - if (volumeInfo == null) { - throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, null); - } - - VolumeInfo info = VolumeInfo.parse(new String(volumeInfo, encoding)); - return info.getOwner().getName().equals(acl.getName()); - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, null, ex); - } finally { - lock.readLock().unlock(); - } - } - - /** - * getVolumeInfo returns the Volume Info of a specific volume. - * - * @param args - Volume args - * @return VolumeInfo - * @throws OzoneException - */ - public VolumeInfo getVolumeInfo(VolumeArgs args) throws OzoneException { - lock.readLock().lock(); - try { - byte[] volumeInfo = - metadataDB.get(args.getVolumeName().getBytes(encoding)); - if (volumeInfo == null) { - throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, args); - } - - return VolumeInfo.parse(new String(volumeInfo, encoding)); - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns all the volumes owned by a specific user. - * - * @param args - User Args - * @return - ListVolumes - * @throws OzoneException - */ - public ListVolumes listVolumes(ListArgs args) throws OzoneException { - lock.readLock().lock(); - try { - if (args.isRootScan()) { - return listAllVolumes(args); - } - - UserArgs uArgs = (UserArgs) args.getArgs(); - byte[] volumeList = userDB.get(uArgs.getUserName().getBytes(encoding)); - if (volumeList == null) { - throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, uArgs); - } - - String prefix = args.getPrefix(); - int maxCount = args.getMaxKeys(); - String prevKey = args.getPrevKey(); - if (prevKey != null) { - // Format is username/volumeName, in local mode we don't use the - // user name since we have a userName DB. - String[] volName = args.getPrevKey().split("/"); - if (volName.length < 2) { - throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, uArgs); - } - prevKey = volName[1]; - } - return getFilteredVolumes(volumeList, prefix, prevKey, maxCount); - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args.getArgs(), ex); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns a List of Volumes that meet the prefix, prevkey and maxCount - * constraints. - * - * @param volumeList - Byte Array of Volume Info. - * @param prefix - prefix string. - * @param prevKey - PrevKey - * @param maxCount - Maximum Count. - * @return ListVolumes. - * @throws IOException - */ - private ListVolumes getFilteredVolumes(byte[] volumeList, String prefix, - String prevKey, int maxCount) throws - IOException { - ListVolumes volumes = ListVolumes.parse(new String(volumeList, - encoding)); - int currentCount = 0; - ListIterator<VolumeInfo> iter = volumes.getVolumes().listIterator(); - ListVolumes filteredVolumes = new ListVolumes(); - while (currentCount < maxCount && iter.hasNext()) { - VolumeInfo vInfo = iter.next(); - if (isMatchingPrefix(prefix, vInfo) && isAfterKey(prevKey, vInfo)) { - filteredVolumes.addVolume(vInfo); - currentCount++; - } - } - return filteredVolumes; - } - - /** - * Returns all volumes in a cluster. - * - * @param args - ListArgs. - * @return ListVolumes. - * @throws OzoneException - */ - public ListVolumes listAllVolumes(ListArgs args) - throws OzoneException, IOException { - String prefix = args.getPrefix(); - final String prevKey; - int maxCount = args.getMaxKeys(); - String userName = null; - - if (args.getPrevKey() != null) { - // Format is username/volumeName - String[] volName = args.getPrevKey().split("/"); - if (volName.length < 2) { - throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()); - } - - byte[] userNameBytes = userDB.get(volName[0].getBytes(encoding)); - userName = new String(userNameBytes, encoding); - prevKey = volName[1]; - } else { - userName = new String(userDB.peekAround(0, null).getKey(), encoding); - prevKey = null; - } - - if (userName == null || userName.isEmpty()) { - throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()); - } - - ListVolumes returnSet = new ListVolumes(); - // we need to iterate through users until we get maxcount volumes - // or no more volumes are left. - userDB.iterate(null, (key, value) -> { - int currentSize = returnSet.getVolumes().size(); - if (currentSize < maxCount) { - String name = new String(key, encoding); - byte[] volumeList = userDB.get(name.getBytes(encoding)); - if (volumeList == null) { - throw new IOException( - ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs())); - } - returnSet.getVolumes().addAll( - getFilteredVolumes(volumeList, prefix, prevKey, - maxCount - currentSize).getVolumes()); - return true; - } else { - return false; - } - }); - - return returnSet; - } - - /** - * Checks if a name starts with a matching prefix. - * - * @param prefix - prefix string. - * @param vInfo - volume info. - * @return true or false. - */ - private boolean isMatchingPrefix(String prefix, VolumeInfo vInfo) { - if (prefix == null || prefix.isEmpty()) { - return true; - } - return vInfo.getVolumeName().startsWith(prefix); - } - - /** - * Checks if the key is after the prevKey. - * - * @param prevKey - String prevKey. - * @param vInfo - volume Info. - * @return - true or false. - */ - private boolean isAfterKey(String prevKey, VolumeInfo vInfo) { - if (prevKey == null || prevKey.isEmpty()) { - return true; - } - return prevKey.compareTo(vInfo.getVolumeName()) < 0; - } - - /** - * Deletes a volume if it exists and is empty. - * - * @param args - volume args - * @throws OzoneException - */ - public void deleteVolume(VolumeArgs args) throws OzoneException { - lock.writeLock().lock(); - try { - byte[] volumeName = - metadataDB.get(args.getVolumeName().getBytes(encoding)); - if (volumeName == null) { - throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, args); - } - - VolumeInfo vInfo = VolumeInfo.parse(new String(volumeName, encoding)); - - // Only remove volumes if they are empty. - if (vInfo.getBucketCount() > 0) { - throw ErrorTable.newError(ErrorTable.VOLUME_NOT_EMPTY, args); - } - - ListVolumes volumeList; - String user = vInfo.getOwner().getName(); - byte[] userVolumes = userDB.get(user.getBytes(encoding)); - if (userVolumes == null) { - throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, args); - } - - volumeList = ListVolumes.parse(new String(userVolumes, encoding)); - volumeList.getVolumes().remove(vInfo); - - metadataDB.delete(args.getVolumeName().getBytes(encoding)); - userDB.put(user.getBytes(encoding), - volumeList.toDBString().getBytes(encoding)); - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Create a bucket if it does not exist. - * - * @param args - BucketArgs - * @throws OzoneException - */ - public void createBucket(BucketArgs args) throws OzoneException { - lock.writeLock().lock(); - try { - // check if volume exists, buckets cannot be created without volumes - byte[] volumeName = metadataDB.get(args.getVolumeName() - .getBytes(encoding)); - if (volumeName == null) { - throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, args); - } - - // A resource name is volume/bucket -- That is the key in metadata table - byte[] bucketName = metadataDB.get(args.getResourceName() - .getBytes(encoding)); - if (bucketName != null) { - throw ErrorTable.newError(ErrorTable.BUCKET_ALREADY_EXISTS, args); - } - - BucketInfo bucketInfo = - new BucketInfo(args.getVolumeName(), args.getBucketName()); - - if (args.getRemoveAcls() != null) { - OzoneException ex = ErrorTable.newError(ErrorTable.MALFORMED_ACL, args); - ex.setMessage("Remove ACLs specified in bucket create. Please remove " - + "them and retry."); - throw ex; - } - - VolumeInfo volInfo = VolumeInfo.parse(new String(volumeName, encoding)); - volInfo.setBucketCount(volInfo.getBucketCount() + 1); - - bucketInfo.setAcls(args.getAddAcls()); - bucketInfo.setStorageType(args.getStorageType()); - bucketInfo.setVersioning(args.getVersioning()); - ListBuckets bucketList; - - // get bucket list from user/volume -> bucketList - byte[] volumeBuckets = userDB.get(args.getParentName() - .getBytes(encoding)); - if (volumeBuckets == null) { - bucketList = new ListBuckets(); - } else { - bucketList = ListBuckets.parse(new String(volumeBuckets, encoding)); - } - - bucketList.addBucket(bucketInfo); - bucketList.sort(); - - // Update Volume->bucketCount - userDB.put(args.getVolumeName().getBytes(encoding), - volInfo.toDBString().getBytes(encoding)); - - // Now update the userDB with user/volume -> bucketList - userDB.put(args.getParentName().getBytes(encoding), - bucketList.toDBString().getBytes(encoding)); - - // Update userDB with volume/bucket -> empty key list - userDB.put(args.getResourceName().getBytes(encoding), - new ListKeys().toDBString().getBytes(encoding)); - - // and update the metadataDB with volume/bucket->BucketInfo - metadataDB.put(args.getResourceName().getBytes(encoding), - bucketInfo.toDBString().getBytes(encoding)); - - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Updates the Bucket properties like ACls and Storagetype. - * - * @param args - Bucket Args - * @param property - Flag which tells us what property to upgrade - * @throws OzoneException - */ - public void setBucketProperty(BucketArgs args, BucketProperty property) - throws OzoneException { - - lock.writeLock().lock(); - try { - // volume/bucket-> bucketInfo - byte[] bucketInfo = metadataDB.get(args.getResourceName(). - getBytes(encoding)); - if (bucketInfo == null) { - throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); - } - - BucketInfo info = BucketInfo.parse(new String(bucketInfo, encoding)); - byte[] volumeBuckets = userDB.get(args.getParentName() - .getBytes(encoding)); - ListBuckets bucketList = ListBuckets.parse(new String(volumeBuckets, - encoding)); - bucketList.getBuckets().remove(info); - - switch (property) { - case ACLS: - processRemoveAcls(args, info); - processAddAcls(args, info); - break; - case STORAGETYPE: - info.setStorageType(args.getStorageType()); - break; - case VERSIONING: - info.setVersioning(args.getVersioning()); - break; - default: - OzoneException ozEx = - ErrorTable.newError(ErrorTable.BAD_PROPERTY, args); - ozEx.setMessage("Bucket property is not recognized."); - throw ozEx; - } - - bucketList.addBucket(info); - metadataDB.put(args.getResourceName().getBytes(encoding), - info.toDBString().getBytes(encoding)); - - userDB.put(args.getParentName().getBytes(encoding), - bucketList.toDBString().getBytes(encoding)); - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Process Remove Acls and remove them from the bucket. - * - * @param args - BucketArgs - * @param info - BucketInfo - */ - private void processRemoveAcls(BucketArgs args, BucketInfo info) { - List<OzoneAcl> removeAcls = args.getRemoveAcls(); - if ((removeAcls == null) || (info.getAcls() == null)) { - return; - } - for (OzoneAcl racl : args.getRemoveAcls()) { - ListIterator<OzoneAcl> aclIter = info.getAcls().listIterator(); - while (aclIter.hasNext()) { - if (racl.equals(aclIter.next())) { - aclIter.remove(); - break; - } - } - } - } - - /** - * Process Add Acls and Add them to the bucket. - * - * @param args - BucketArgs - * @param info - BucketInfo - */ - private void processAddAcls(BucketArgs args, BucketInfo info) { - List<OzoneAcl> addAcls = args.getAddAcls(); - if ((addAcls == null)) { - return; - } - - if (info.getAcls() == null) { - info.setAcls(addAcls); - return; - } - - for (OzoneAcl newacl : addAcls) { - ListIterator<OzoneAcl> aclIter = info.getAcls().listIterator(); - while (aclIter.hasNext()) { - if (newacl.equals(aclIter.next())) { - continue; - } - } - info.getAcls().add(newacl); - } - } - - /** - * Deletes a given bucket. - * - * @param args - BucketArgs - * @throws OzoneException - */ - public void deleteBucket(BucketArgs args) throws OzoneException { - lock.writeLock().lock(); - try { - byte[] bucketInfo = metadataDB.get(args.getResourceName() - .getBytes(encoding)); - if (bucketInfo == null) { - throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); - } - - BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding)); - - // Only remove buckets if they are empty. - if (bInfo.getKeyCount() > 0) { - throw ErrorTable.newError(ErrorTable.BUCKET_NOT_EMPTY, args); - } - - byte[] bucketBytes = userDB.get(args.getParentName().getBytes(encoding)); - if (bucketBytes == null) { - throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); - } - - ListBuckets bucketList = - ListBuckets.parse(new String(bucketBytes, encoding)); - bucketList.getBuckets().remove(bInfo); - - metadataDB.delete(args.getResourceName().getBytes(encoding)); - userDB.put(args.getParentName().getBytes(encoding), - bucketList.toDBString().getBytes(encoding)); - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Returns the Bucket info for a given bucket. - * - * @param args - Bucket Args - * @return BucketInfo - Bucket Information - * @throws OzoneException - */ - public BucketInfo getBucketInfo(BucketArgs args) throws OzoneException { - lock.readLock().lock(); - try { - byte[] bucketBytes = metadataDB.get(args.getResourceName() - .getBytes(encoding)); - if (bucketBytes == null) { - throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); - } - - return BucketInfo.parse(new String(bucketBytes, encoding)); - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns a list of buckets for a given volume. - * - * @param args - volume args - * @return List of buckets - * @throws OzoneException - */ - public ListBuckets listBuckets(ListArgs args) throws OzoneException { - lock.readLock().lock(); - try { - Preconditions.checkState(args.getArgs() instanceof VolumeArgs); - VolumeArgs vArgs = (VolumeArgs) args.getArgs(); - String userVolKey = vArgs.getUserName() + "/" + vArgs.getVolumeName(); - - // TODO : Query using Prefix and PrevKey - byte[] bucketBytes = userDB.get(userVolKey.getBytes(encoding)); - if (bucketBytes == null) { - throw ErrorTable.newError(ErrorTable.INVALID_VOLUME_NAME, - args.getArgs()); - } - return ListBuckets.parse(new String(bucketBytes, encoding)); - } catch (IOException ex) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args.getArgs(), ex); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Creates a key and returns a stream to which this key can be written to. - * - * @param args KeyArgs - * @return - A stream into which key can be written to. - * @throws OzoneException - */ - public OutputStream createKey(KeyArgs args) throws OzoneException { - lock.writeLock().lock(); - try { - String fileNameHash = DigestUtils.sha256Hex(args.getResourceName()); - - // Please don't try trillion objects unless the physical file system - // is capable of doing that in a single directory. - - String fullPath = storageRoot + OBJECT_DIR + fileNameHash; - File f = new File(fullPath); - - // In real ozone it would not be this way, a file will be overwritten - // only if the upload is successful. - if (f.exists()) { - LOG.debug("we are overwriting a file. This is by design."); - if (!f.delete()) { - LOG.error("Unable to delete the file: {}", fullPath); - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args); - } - } - - // f.createNewFile(); - FileOutputStream fsStream = new FileOutputStream(f); - inProgressObjects.put(fsStream, fullPath); - - return fsStream; - } catch (IOException e) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * commit keys moves an In progress object into the metadata store so that key - * is visible in the metadata operations from that point onwards. - * - * @param args Object args - * @throws OzoneException - */ - public void commitKey(KeyArgs args, OutputStream stream) - throws OzoneException { - SimpleDateFormat format = - new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US); - lock.writeLock().lock(); - - try { - byte[] bucketInfo = metadataDB.get(args.getParentName() - .getBytes(encoding)); - if (bucketInfo == null) { - throw ErrorTable.newError(ErrorTable.INVALID_RESOURCE_NAME, args); - } - BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding)); - bInfo.setKeyCount(bInfo.getKeyCount() + 1); - - String fileNameHash = inProgressObjects.get(stream); - inProgressObjects.remove(stream); - if (fileNameHash == null) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args); - } - - ListKeys keyList; - byte[] bucketListBytes = userDB.get(args.getParentName() - .getBytes(encoding)); - keyList = ListKeys.parse(new String(bucketListBytes, encoding)); - KeyInfo keyInfo; - - byte[] objectBytes = metadataDB.get(args.getResourceName() - .getBytes(encoding)); - - if (objectBytes != null) { - // we are overwriting an existing object. - // TODO : Emit info for Accounting - keyInfo = KeyInfo.parse(new String(objectBytes, encoding)); - keyList.getKeyList().remove(keyInfo); - } else { - keyInfo = new KeyInfo(); - } - - keyInfo.setCreatedOn(format.format(new Date(System.currentTimeMillis()))); - - // TODO : support version, we need to check if versioning - // is switched on the bucket and make appropriate calls. - keyInfo.setVersion(0); - - keyInfo.setDataFileName(fileNameHash); - keyInfo.setKeyName(args.getKeyName()); - keyInfo.setMd5hash(args.getHash()); - keyInfo.setSize(args.getSize()); - - keyList.getKeyList().add(keyInfo); - - // if the key exists, we overwrite happily :). since the - // earlier call - createObject - has overwritten the data. - - metadataDB.put(args.getResourceName().getBytes(encoding), - keyInfo.toDBString().getBytes(encoding)); - - metadataDB.put(args.getParentName().getBytes(encoding), - bInfo.toDBString().getBytes(encoding)); - - userDB.put(args.getParentName().getBytes(encoding), - keyList.toDBString().getBytes(encoding)); - - } catch (IOException e) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * deletes an key from a given bucket. - * - * @param args - ObjectArgs - * @throws OzoneException - */ - public void deleteKey(KeyArgs args) throws OzoneException { - lock.writeLock().lock(); - try { - byte[] bucketInfo = metadataDB.get(args.getParentName() - .getBytes(encoding)); - if (bucketInfo == null) { - throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); - } - BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding)); - bInfo.setKeyCount(bInfo.getKeyCount() - 1); - - - byte[] bucketListBytes = userDB.get(args.getParentName() - .getBytes(encoding)); - if (bucketListBytes == null) { - throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); - } - ListKeys keyList = ListKeys.parse(new String(bucketListBytes, encoding)); - - - byte[] objectBytes = metadataDB.get(args.getResourceName() - .getBytes(encoding)); - if (objectBytes == null) { - throw ErrorTable.newError(ErrorTable.INVALID_KEY, args); - } - - KeyInfo oInfo = KeyInfo.parse(new String(objectBytes, encoding)); - keyList.getKeyList().remove(oInfo); - - String fileNameHash = DigestUtils.sha256Hex(args.getResourceName()); - - String fullPath = storageRoot + OBJECT_DIR + fileNameHash; - File f = new File(fullPath); - - if (f.exists()) { - if (!f.delete()) { - throw ErrorTable.newError(ErrorTable.KEY_OPERATION_CONFLICT, args); - } - } else { - throw ErrorTable.newError(ErrorTable.INVALID_KEY, args); - } - - - metadataDB.delete(args.getResourceName().getBytes(encoding)); - metadataDB.put(args.getParentName().getBytes(encoding), - bInfo.toDBString().getBytes(encoding)); - userDB.put(args.getParentName().getBytes(encoding), - keyList.toDBString().getBytes(encoding)); - } catch (IOException e) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Returns a Stream for the file. - * - * @param args - Object args - * @return Stream - * @throws IOException - * @throws OzoneException - */ - public LengthInputStream newKeyReader(KeyArgs args) - throws IOException, OzoneException { - lock.readLock().lock(); - try { - String fileNameHash = DigestUtils.sha256Hex(args.getResourceName()); - String fullPath = storageRoot + OBJECT_DIR + fileNameHash; - File f = new File(fullPath); - if (!f.exists()) { - throw ErrorTable.newError(ErrorTable.INVALID_RESOURCE_NAME, args); - } - long size = f.length(); - - FileInputStream fileStream = new FileInputStream(f); - return new LengthInputStream(fileStream, size); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns keys in a bucket. - * - * @param args - * @return List of keys. - * @throws IOException - * @throws OzoneException - */ - public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { - lock.readLock().lock(); - // TODO : Support Prefix and PrevKey lookup. - try { - Preconditions.checkState(args.getArgs() instanceof BucketArgs); - BucketArgs bArgs = (BucketArgs) args.getArgs(); - byte[] bucketInfo = metadataDB.get(bArgs.getResourceName() - .getBytes(encoding)); - if (bucketInfo == null) { - throw ErrorTable.newError(ErrorTable.INVALID_RESOURCE_NAME, bArgs); - } - - byte[] bucketListBytes = userDB.get(bArgs.getResourceName() - .getBytes(encoding)); - if (bucketListBytes == null) { - throw ErrorTable.newError(ErrorTable.INVALID_RESOURCE_NAME, bArgs); - } - return ListKeys.parse(new String(bucketListBytes, encoding)); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Get the Key information for a given key. - * - * @param args - Key Args - * @return KeyInfo - Key Information - * @throws OzoneException - */ - public KeyInfo getKeyInfo(KeyArgs args) throws IOException, OzoneException { - lock.readLock().lock(); - try { - byte[] bucketInfo = metadataDB - .get(args.getParentName().getBytes(encoding)); - if (bucketInfo == null) { - throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); - } - - byte[] bucketListBytes = userDB - .get(args.getParentName().getBytes(encoding)); - if (bucketListBytes == null) { - throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); - } - - byte[] objectBytes = metadataDB - .get(args.getResourceName().getBytes(encoding)); - if (objectBytes == null) { - throw ErrorTable.newError(ErrorTable.INVALID_KEY, args); - } - - return KeyInfo.parse(new String(objectBytes, encoding)); - } catch (IOException e) { - throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e); - } finally { - lock.readLock().unlock(); - } - } - - /** - * This is used in updates to volume metadata. - */ - public enum VolumeProperty { - OWNER, QUOTA - } - - /** - * Bucket Properties. - */ - public enum BucketProperty { - ACLS, STORAGETYPE, VERSIONING - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/package-info.java deleted file mode 100644 index 6bf6643..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.localstorage; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/LengthInputStreamMessageBodyWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/LengthInputStreamMessageBodyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/LengthInputStreamMessageBodyWriter.java deleted file mode 100644 index 6db49d5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/LengthInputStreamMessageBodyWriter.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.web.messages; - -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.client.io.LengthInputStream; - -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.annotation.Annotation; -import java.lang.reflect.Type; - -/** - * Writes outbound HTTP response object bytes. The content length is determined - * from the {@link LengthInputStream}. - */ -public final class LengthInputStreamMessageBodyWriter - implements MessageBodyWriter<LengthInputStream> { - private static final int CHUNK_SIZE = 8192; - - @Override - public long getSize(LengthInputStream lis, Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType) { - return lis.getLength(); - } - - @Override - public boolean isWriteable(Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType) { - return LengthInputStream.class.isAssignableFrom(type); - } - - @Override - public void writeTo(LengthInputStream lis, Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, Object> httpHeaders, - OutputStream out) throws IOException { - IOUtils.copyBytes(lis, out, CHUNK_SIZE); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/StringMessageBodyWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/StringMessageBodyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/StringMessageBodyWriter.java deleted file mode 100644 index ad637af..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/StringMessageBodyWriter.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.web.messages; - -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; - -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyWriter; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.annotation.Annotation; -import java.lang.reflect.Type; - -/** - * Writes outbound HTTP response strings. We use this rather than the built-in - * writer so that we can determine content length from the string length instead - * of possibly falling back to a chunked response. - */ -public final class StringMessageBodyWriter implements - MessageBodyWriter<String> { - private static final int CHUNK_SIZE = 8192; - - @Override - public long getSize(String str, Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType) { - return str.length(); - } - - @Override - public boolean isWriteable(Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType) { - return String.class.isAssignableFrom(type); - } - - @Override - public void writeTo(String str, Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, Object> httpHeaders, - OutputStream out) throws IOException { - IOUtils.copyBytes(new ByteArrayInputStream( - str.getBytes(OzoneUtils.ENCODING)), out, CHUNK_SIZE); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/package-info.java deleted file mode 100644 index 273b3f5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/messages/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.messages; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/CloseableCleanupListener.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/CloseableCleanupListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/CloseableCleanupListener.java deleted file mode 100644 index 3d9db20..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/CloseableCleanupListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.netty; - -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import org.apache.hadoop.io.IOUtils; - -import java.io.Closeable; - -/** - * A {@link ChannelFutureListener} that closes {@link Closeable} resources. - */ -final class CloseableCleanupListener implements ChannelFutureListener { - - private final Closeable[] closeables; - - /** - * Creates a new CloseableCleanupListener. - * - * @param closeables any number of closeable resources - */ - CloseableCleanupListener(Closeable... closeables) { - this.closeables = closeables; - } - - @Override - public void operationComplete(ChannelFuture future) { - IOUtils.cleanupWithLogger(null, closeables); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreChannelHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreChannelHandler.java deleted file mode 100644 index 89c196c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreChannelHandler.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.netty; - -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE; -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -/** - * Abstract base class for the multiple Netty channel handlers used in the - * Object Store Netty channel pipeline. - */ -abstract class ObjectStoreChannelHandler<T> - extends SimpleChannelInboundHandler<T> { - - /** Log usable in all subclasses. */ - protected static final Logger LOG = - LoggerFactory.getLogger(ObjectStoreChannelHandler.class); - - /** - * Handles uncaught exceptions in the channel pipeline by sending an internal - * server error response if the channel is still active. - * - * @param ctx ChannelHandlerContext to receive response - * @param cause Throwable that was unhandled in the channel pipeline - */ - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - LOG.error("Unexpected exception in Netty pipeline.", cause); - if (ctx.channel().isActive()) { - sendErrorResponse(ctx, INTERNAL_SERVER_ERROR); - } - } - - /** - * Sends an error response. This method is used when an unexpected error is - * encountered within the channel pipeline, outside of the actual Object Store - * application. It always closes the connection, because we can't in general - * know the state of the connection when these errors occur, so attempting to - * keep the connection alive could be unpredictable. - * - * @param ctx ChannelHandlerContext to receive response - * @param status HTTP response status - */ - protected static void sendErrorResponse(ChannelHandlerContext ctx, - HttpResponseStatus status) { - HttpResponse nettyResp = new DefaultFullHttpResponse(HTTP_1_1, status); - nettyResp.headers().set(CONTENT_LENGTH, 0); - nettyResp.headers().set(CONNECTION, CLOSE); - ctx.writeAndFlush(nettyResp).addListener(ChannelFutureListener.CLOSE); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java deleted file mode 100644 index c7b516f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java +++ /dev/null @@ -1,348 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.netty; - -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static io.netty.handler.codec.http.HttpHeaders.Names.TRANSFER_ENCODING; -import static io.netty.handler.codec.http.HttpHeaders.Names.HOST; -import static io.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE; -import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.TimeUnit; - -import com.sun.jersey.core.header.InBoundHeaders; -import com.sun.jersey.spi.container.ContainerRequest; -import com.sun.jersey.spi.container.ContainerResponse; -import com.sun.jersey.spi.container.ContainerResponseWriter; -import com.sun.jersey.spi.container.WebApplication; - -import io.netty.handler.codec.http.DefaultHttpResponse; -//import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.web.interfaces.StorageHandler; -import org.apache.hadoop.ozone.web.handlers.StorageHandlerBuilder; - -/** - * This is a custom Jersey container that hosts the Object Store web - * application. It supports dispatching an inbound Netty {@link HttpRequest} - * to the Object Store Jersey application. Request dispatching must run - * asynchronously, because the Jersey application must consume the inbound - * HTTP request from a piped stream and produce the outbound HTTP response - * for another piped stream.The Netty channel handlers consume the connected - * ends of these piped streams. Request dispatching cannot run directly on - * the Netty threads, or there would be a risk of deadlock (one thread - * producing/consuming its end of the pipe while no other thread is - * producing/consuming the opposite end). - */ -public final class ObjectStoreJerseyContainer { - - private static final Logger LOG = - LoggerFactory.getLogger(ObjectStoreJerseyContainer.class); - - private final WebApplication webapp; - - private StorageHandler storageHandler; - - /** - * Creates a new ObjectStoreJerseyContainer. - * - * @param webapp web application - */ - public ObjectStoreJerseyContainer(WebApplication webapp) { - this.webapp = webapp; - } - - /** - * Sets the {@link StorageHandler}. This must be called before dispatching any - * requests. - * - * @param newStorageHandler {@link StorageHandler} implementation - */ - public void setStorageHandler(StorageHandler newStorageHandler) { - this.storageHandler = newStorageHandler; - } - - /** - * Asynchronously executes an HTTP request. - * - * @param nettyReq HTTP request - * @param reqIn input stream for reading request body - * @param respOut output stream for writing response body - */ - public Future<HttpResponse> dispatch(HttpRequest nettyReq, InputStream reqIn, - OutputStream respOut) { - // The request executes on a separate background thread. As soon as enough - // processing has completed to bootstrap the outbound response, the thread - // counts down on a latch. This latch also unblocks callers trying to get - // the asynchronous response out of the returned future. - final CountDownLatch latch = new CountDownLatch(1); - final RequestRunner runner = new RequestRunner(nettyReq, reqIn, respOut, - latch); - final Thread thread = new Thread(runner); - thread.setDaemon(true); - thread.start(); - return new Future<HttpResponse>() { - - private volatile boolean isCancelled = false; - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (latch.getCount() == 0) { - return false; - } - if (!mayInterruptIfRunning) { - return false; - } - if (!thread.isAlive()) { - return false; - } - thread.interrupt(); - try { - thread.join(); - } catch (InterruptedException e) { - LOG.info("Interrupted while attempting to cancel dispatch thread."); - Thread.currentThread().interrupt(); - return false; - } - isCancelled = true; - return true; - } - - @Override - public HttpResponse get() - throws InterruptedException, ExecutionException { - checkCancelled(); - latch.await(); - return this.getOrThrow(); - } - - @Override - public HttpResponse get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - checkCancelled(); - if (!latch.await(timeout, unit)) { - throw new TimeoutException(String.format( - "Timed out waiting for HttpResponse after %d %s.", - timeout, unit.toString().toLowerCase())); - } - return this.getOrThrow(); - } - - @Override - public boolean isCancelled() { - return isCancelled; - } - - @Override - public boolean isDone() { - return !isCancelled && latch.getCount() == 0; - } - - private void checkCancelled() { - if (isCancelled()) { - throw new CancellationException(); - } - } - - private HttpResponse getOrThrow() throws ExecutionException { - try { - return runner.getResponse(); - } catch (Exception e) { - throw new ExecutionException(e); - } - } - }; - } - - /** - * Runs the actual handling of the HTTP request. - */ - private final class RequestRunner implements Runnable, - ContainerResponseWriter { - - private final CountDownLatch latch; - private final HttpRequest nettyReq; - private final InputStream reqIn; - private final OutputStream respOut; - - private Exception exception; - private HttpResponse nettyResp; - - /** - * Creates a new RequestRunner. - * - * @param nettyReq HTTP request - * @param reqIn input stream for reading request body - * @param respOut output stream for writing response body - * @param latch for coordinating asynchronous return of HTTP response - */ - RequestRunner(HttpRequest nettyReq, InputStream reqIn, - OutputStream respOut, CountDownLatch latch) { - this.latch = latch; - this.nettyReq = nettyReq; - this.reqIn = reqIn; - this.respOut = respOut; - } - - @Override - public void run() { - LOG.trace("begin RequestRunner, nettyReq = {}", this.nettyReq); - StorageHandlerBuilder.setStorageHandler( - ObjectStoreJerseyContainer.this.storageHandler); - try { - ContainerRequest jerseyReq = nettyRequestToJerseyRequest( - ObjectStoreJerseyContainer.this.webapp, this.nettyReq, this.reqIn); - ObjectStoreJerseyContainer.this.webapp.handleRequest(jerseyReq, this); - } catch (Exception e) { - LOG.error("Error running Jersey Request Runner", e); - this.exception = e; - this.latch.countDown(); - } finally { - IOUtils.cleanupWithLogger(null, this.reqIn, this.respOut); - StorageHandlerBuilder.removeStorageHandler(); - } - LOG.trace("end RequestRunner, nettyReq = {}", this.nettyReq); - } - - /** - * This is a callback triggered by Jersey as soon as dispatch has completed - * to the point of knowing what kind of response to return. We save the - * response and trigger the latch to unblock callers waiting on the - * asynchronous return of the response. Our response always sets a - * Content-Length header. (We do not support Transfer-Encoding: chunked.) - * We also return the output stream for Jersey to use for writing the - * response body. - * - * @param contentLength length of response - * @param jerseyResp HTTP response returned by Jersey - * @return OutputStream for Jersey to use for writing the response body - */ - @Override - public OutputStream writeStatusAndHeaders(long contentLength, - ContainerResponse jerseyResp) { - LOG.trace( - "begin writeStatusAndHeaders, contentLength = {}, jerseyResp = {}.", - contentLength, jerseyResp); - this.nettyResp = jerseyResponseToNettyResponse(jerseyResp); - this.nettyResp.headers().set(CONTENT_LENGTH, Math.max(0, contentLength)); - this.nettyResp.headers().set(CONNECTION, - HttpHeaders.isKeepAlive(this.nettyReq) ? KEEP_ALIVE : CLOSE); - this.latch.countDown(); - LOG.trace( - "end writeStatusAndHeaders, contentLength = {}, jerseyResp = {}.", - contentLength, jerseyResp); - return this.respOut; - } - - /** - * This is a callback triggered by Jersey after it has completed writing the - * response body to the stream. We must close the stream here to unblock - * the Netty thread consuming the last chunk of the response from the input - * end of the piped stream. - * - * @throws IOException if there is an I/O error - */ - @Override - public void finish() throws IOException { - IOUtils.cleanupWithLogger(null, this.respOut); - } - - /** - * Gets the HTTP response calculated by the Jersey application, or throws an - * exception if an error occurred during processing. It only makes sense to - * call this method after waiting on the latch to trigger. - * - * @return HTTP response - * @throws Exception if there was an error executing the request - */ - public HttpResponse getResponse() throws Exception { - if (this.exception != null) { - throw this.exception; - } - return this.nettyResp; - } - } - - /** - * Converts a Jersey HTTP response object to a Netty HTTP response object. - * - * @param jerseyResp Jersey HTTP response - * @return Netty HTTP response - */ - private static HttpResponse jerseyResponseToNettyResponse( - ContainerResponse jerseyResp) { - HttpResponse nettyResp = new DefaultHttpResponse(HTTP_1_1, - HttpResponseStatus.valueOf(jerseyResp.getStatus())); - for (Map.Entry<String, List<Object>> header : - jerseyResp.getHttpHeaders().entrySet()) { - if (!header.getKey().equalsIgnoreCase(CONTENT_LENGTH.toString()) && - !header.getKey().equalsIgnoreCase(TRANSFER_ENCODING.toString())) { - nettyResp.headers().set(header.getKey(), header.getValue()); - } - } - return nettyResp; - } - - /** - * Converts a Netty HTTP request object to a Jersey HTTP request object. - * - * @param webapp web application - * @param nettyReq Netty HTTP request - * @param reqIn input stream for reading request body - * @return Jersey HTTP request - * @throws URISyntaxException if there is an error handling the request URI - */ - private static ContainerRequest nettyRequestToJerseyRequest( - WebApplication webapp, HttpRequest nettyReq, InputStream reqIn) - throws URISyntaxException { - HttpHeaders nettyHeaders = nettyReq.headers(); - InBoundHeaders jerseyHeaders = new InBoundHeaders(); - for (String name : nettyHeaders.names()) { - jerseyHeaders.put(name, nettyHeaders.getAll(name)); - } - String host = nettyHeaders.get(HOST); - String scheme = host.startsWith("https") ? "https://" : "http://"; - String baseUri = scheme + host + "/"; - String reqUri = scheme + host + nettyReq.getUri(); - LOG.trace("baseUri = {}, reqUri = {}", baseUri, reqUri); - return new ContainerRequest(webapp, nettyReq.getMethod().name(), - new URI(baseUri), new URI(reqUri), jerseyHeaders, reqIn); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainerProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainerProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainerProvider.java deleted file mode 100644 index e943969..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainerProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.netty; - -import com.sun.jersey.api.container.ContainerException; -import com.sun.jersey.api.core.ResourceConfig; -import com.sun.jersey.spi.container.ContainerProvider; -import com.sun.jersey.spi.container.WebApplication; - -/** - * This is a Jersey {@link ContainerProvider} capable of boostrapping the - * Object Store web application into a custom container. It must be registered - * using the Java service loader mechanism by listing it in - * META-INF/services/com.sun.jersey.spi.container.ContainerProvider . - */ -public final class ObjectStoreJerseyContainerProvider - implements ContainerProvider<ObjectStoreJerseyContainer> { - - @Override - public ObjectStoreJerseyContainer createContainer( - Class<ObjectStoreJerseyContainer> type, ResourceConfig conf, - WebApplication webapp) throws ContainerException { - return new ObjectStoreJerseyContainer(webapp); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java deleted file mode 100644 index 0a2f22d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.netty; - -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.stream.ChunkedStream; -import org.apache.hadoop.io.IOUtils; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.Future; - -/** - * Object Store Netty channel pipeline handler that handles inbound - * {@link HttpContent} fragments for the request body by sending the bytes into - * the pipe so that the application dispatch thread can read it. - * After receiving the {@link LastHttpContent}, this handler also flushes the - * response. - */ -public final class RequestContentObjectStoreChannelHandler - extends ObjectStoreChannelHandler<HttpContent> { - - private final HttpRequest nettyReq; - private final Future<HttpResponse> nettyResp; - private final OutputStream reqOut; - private final InputStream respIn; - private ObjectStoreJerseyContainer jerseyContainer; - - /** - * Creates a new RequestContentObjectStoreChannelHandler. - * - * @param nettyReq HTTP request - * @param nettyResp asynchronous HTTP response - * @param reqOut output stream for writing request body - * @param respIn input stream for reading response body - * @param jerseyContainer jerseyContainer to handle the request - */ - public RequestContentObjectStoreChannelHandler(HttpRequest nettyReq, - Future<HttpResponse> nettyResp, OutputStream reqOut, InputStream respIn, - ObjectStoreJerseyContainer jerseyContainer) { - this.nettyReq = nettyReq; - this.nettyResp = nettyResp; - this.reqOut = reqOut; - this.respIn = respIn; - this.jerseyContainer = jerseyContainer; - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.flush(); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, HttpContent content) - throws Exception { - LOG.trace( - "begin RequestContentObjectStoreChannelHandler channelRead0, " + - "ctx = {}, content = {}", ctx, content); - content.content().readBytes(this.reqOut, content.content().readableBytes()); - if (content instanceof LastHttpContent) { - IOUtils.cleanupWithLogger(null, this.reqOut); - ctx.write(this.nettyResp.get()); - ChannelFuture respFuture = ctx.writeAndFlush(new ChunkedStream( - this.respIn)); - respFuture.addListener(new CloseableCleanupListener(this.respIn)); - if (!HttpHeaders.isKeepAlive(this.nettyReq)) { - respFuture.addListener(ChannelFutureListener.CLOSE); - } else { - respFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // Notify client this is the last content for current request. - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - // Reset the pipeline handler for next request to reuses the - // same connection. - RequestDispatchObjectStoreChannelHandler h = - new RequestDispatchObjectStoreChannelHandler(jerseyContainer); - ctx.pipeline().replace(ctx.pipeline().last(), - RequestDispatchObjectStoreChannelHandler.class.getSimpleName(), - h); - } - }); - } - } - LOG.trace( - "end RequestContentObjectStoreChannelHandler channelRead0, " + - "ctx = {}, content = {}", ctx, content); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - super.exceptionCaught(ctx, cause); - IOUtils.cleanupWithLogger(null, this.reqOut, this.respIn); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java deleted file mode 100644 index add827a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.web.netty; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import org.apache.hadoop.io.IOUtils; - -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.concurrent.Future; - -import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; -import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -/** - * Object Store Netty channel pipeline handler that handles an inbound - * {@link HttpRequest} by dispatching it to the Object Store Jersey container. - * The handler establishes 2 sets of connected piped streams: one for inbound - * request handling and another for outbound response handling. The relevant - * ends of these pipes are handed off to the Jersey application dispatch and the - * next channel handler, which is responsible for streaming in the inbound - * request body and flushing out the response body. - */ -public final class RequestDispatchObjectStoreChannelHandler - extends ObjectStoreChannelHandler<HttpRequest> { - - private final ObjectStoreJerseyContainer jerseyContainer; - - private PipedInputStream reqIn; - private PipedOutputStream reqOut; - private PipedInputStream respIn; - private PipedOutputStream respOut; - - /** - * Creates a new RequestDispatchObjectStoreChannelHandler. - * - * @param jerseyContainer Object Store application Jersey container for - * request dispatch - */ - public RequestDispatchObjectStoreChannelHandler( - ObjectStoreJerseyContainer jerseyContainer) { - this.jerseyContainer = jerseyContainer; - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, HttpRequest nettyReq) - throws Exception { - LOG.trace("begin RequestDispatchObjectStoreChannelHandler channelRead0, " + - "ctx = {}, nettyReq = {}", ctx, nettyReq); - if (!nettyReq.getDecoderResult().isSuccess()) { - sendErrorResponse(ctx, BAD_REQUEST); - return; - } - - this.reqIn = new PipedInputStream(); - this.reqOut = new PipedOutputStream(reqIn); - this.respIn = new PipedInputStream(); - this.respOut = new PipedOutputStream(respIn); - - if (HttpHeaders.is100ContinueExpected(nettyReq)) { - LOG.trace("Sending continue response."); - ctx.writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)); - } - - Future<HttpResponse> nettyResp = this.jerseyContainer.dispatch(nettyReq, - reqIn, respOut); - - ctx.pipeline().replace(this, - RequestContentObjectStoreChannelHandler.class.getSimpleName(), - new RequestContentObjectStoreChannelHandler(nettyReq, nettyResp, - reqOut, respIn, jerseyContainer)); - - LOG.trace("end RequestDispatchObjectStoreChannelHandler channelRead0, " + - "ctx = {}, nettyReq = {}", ctx, nettyReq); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - super.exceptionCaught(ctx, cause); - IOUtils.cleanupWithLogger(null, this.reqIn, this.reqOut, this.respIn, - this.respOut); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/package-info.java deleted file mode 100644 index f4aa675..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/package-info.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * Netty-based HTTP server implementation for Ozone. - */ [email protected] -package org.apache.hadoop.ozone.web.netty; - -import org.apache.hadoop.classification.InterfaceAudience; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ozShell/Handler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ozShell/Handler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ozShell/Handler.java deleted file mode 100644 index 015d46e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ozShell/Handler.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.web.ozShell; - -import org.apache.commons.cli.CommandLine; -import org.apache.hadoop.ozone.web.client.OzoneRestClient; -import org.apache.hadoop.ozone.web.client.OzoneRestClientException; -import org.apache.hadoop.ozone.client.rest.OzoneException; -import org.apache.http.client.utils.URIBuilder; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -/** - * Common interface for command handling. - */ -public abstract class Handler { - protected OzoneRestClient client; - - /** - * Constructs a client object. - */ - public Handler() { - client = new OzoneRestClient(); - } - - /** - * Executes the Client command. - * - * @param cmd - CommandLine - * @throws IOException - * @throws OzoneException - * @throws URISyntaxException - */ - protected abstract void execute(CommandLine cmd) - throws IOException, OzoneException, URISyntaxException; - - /** - * verifies user provided URI. - * - * @param uri - UriString - * @return URI - * @throws URISyntaxException - * @throws OzoneException - */ - protected URI verifyURI(String uri) throws URISyntaxException, - OzoneException { - if ((uri == null) || uri.isEmpty()) { - throw new OzoneRestClientException( - "Ozone URI is needed to execute this command."); - } - URIBuilder ozoneURI = new URIBuilder(uri); - - if (ozoneURI.getPort() == 0) { - ozoneURI.setPort(Shell.DEFAULT_OZONE_PORT); - } - return ozoneURI.build(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
