Author: szetszwo Date: Fri Aug 1 01:29:49 2014 New Revision: 1615020 URL: http://svn.apache.org/r1615020 Log: Merge r1609845 through r1615019 from trunk.
Added: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java - copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/FsActionParam.java - copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/FsActionParam.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemWithXAttrs.java - copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemWithXAttrs.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsWithXAttrs.java - copied unchanged from r1615019, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsWithXAttrs.java Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/XmlEditsVisitor.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/WebHDFS.apt.vm hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsUrl.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestPermissionSymlinks.java Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1614232-1615019 Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 1 01:29:49 2014 @@ -204,9 +204,6 @@ Trunk (Unreleased) HDFS-3549. Fix dist tar build fails in hadoop-hdfs-raid project. (Jason Lowe via daryn) - HDFS-3482. hdfs balancer throws ArrayIndexOutOfBoundsException - if option is specified without values. ( Madhukara Phatak via umamahesh) - HDFS-3614. Revert unused MiniDFSCluster constructor from HDFS-3049. (acmurthy via eli) @@ -346,6 +343,20 @@ Release 2.6.0 - UNRELEASED HDFS-6739. Add getDatanodeStorageReport to ClientProtocol. (szetszwo) + HDFS-6665. Add tests for XAttrs in combination with viewfs. + (Stephen Chu via wang) + + HDFS-6778. The extended attributes javadoc should simply refer to the + user docs. (clamb via wang) + + HDFS-6570. add api that enables checking if a user has certain permissions on + a file. (Jitendra Pandey via cnauroth) + + HDFS-6441. Add ability to exclude/include specific datanodes while + balancing. (Benoy Antony and Yu Li via Arpit Agarwal) + + HDFS-6685. Balancer should preserve storage type of replicas. (szetszwo) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) @@ -408,6 +419,16 @@ Release 2.6.0 - UNRELEASED HDFS-6749. FSNamesystem methods should call resolvePath. (Charles Lamb via cnauroth) + HDFS-4629. Using com.sun.org.apache.xml.internal.serialize.* in + XmlEditsVisitor.java is JVM vendor specific. Breaks IBM JAVA. + (Amir Sanjar via stevel) + + HDFS-3482. hdfs balancer throws ArrayIndexOutOfBoundsException + if option is specified without values. ( Madhukara Phatak via umamahesh) + + HDFS-6797. DataNode logs wrong layoutversion during upgrade. (Benoy Antony + via Arpit Agarwal) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -963,6 +984,9 @@ Release 2.5.0 - UNRELEASED HDFS-6717. JIRA HDFS-5804 breaks default nfs-gateway behavior for unsecured config (brandonli) + HDFS-6768. Fix a few unit tests that use hard-coded port numbers. (Arpit + Agarwal) + BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh) Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/pom.xml Fri Aug 1 01:29:49 2014 @@ -176,6 +176,11 @@ http://maven.apache.org/xsd/maven-4.0.0. <artifactId>netty</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + <scope>compile</scope> + </dependency> </dependencies> <build> Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1614232-1615019 Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Fri Aug 1 01:29:49 2014 @@ -33,6 +33,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.hdfs.CorruptFileBlockIterator; @@ -448,6 +449,11 @@ public class Hdfs extends AbstractFileSy dfs.removeXAttr(getUriPath(path), name); } + @Override + public void access(Path path, final FsAction mode) throws IOException { + dfs.checkAccess(getUriPath(path), mode); + } + /** * Renew an existing delegation token. * Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Aug 1 01:29:49 2014 @@ -122,6 +122,7 @@ import org.apache.hadoop.fs.XAttrSetFlag import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.net.Peer; @@ -2832,6 +2833,17 @@ public class DFSClient implements java.i } } + public void checkAccess(String src, FsAction mode) throws IOException { + checkOpen(); + try { + namenode.checkAccess(src, mode); + } catch (RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + FileNotFoundException.class, + UnresolvedPathException.class); + } + } + @Override // RemotePeerFactory public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Aug 1 01:29:49 2014 @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -1898,4 +1899,23 @@ public class DistributedFileSystem exten } }.resolve(this, absF); } + + @Override + public void access(Path path, final FsAction mode) throws IOException { + final Path absF = fixRelativePart(path); + new FileSystemLinkResolver<Void>() { + @Override + public Void doCall(final Path p) throws IOException { + dfs.checkAccess(getPathName(p), mode); + return null; + } + + @Override + public Void next(final FileSystem fs, final Path p) + throws IOException { + fs.access(p, mode); + return null; + } + }.resolve(this, absF); + } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java Fri Aug 1 01:29:49 2014 @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs; +import java.util.Arrays; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -35,4 +38,10 @@ public enum StorageType { public static final StorageType DEFAULT = DISK; public static final StorageType[] EMPTY_ARRAY = {}; -} \ No newline at end of file + + private static final StorageType[] VALUES = values(); + + public static List<StorageType> asList() { + return Arrays.asList(VALUES); + } +} Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Aug 1 01:29:49 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; @@ -1267,17 +1268,11 @@ public interface ClientProtocol { /** * Set xattr of a file or directory. - * A regular user only can set xattr of "user" namespace. - * A super user can set xattr of "user" and "trusted" namespace. - * XAttr of "security" and "system" namespace is only used/exposed - * internally to the FS impl. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". * <p/> - * For xattr of "user" namespace, its access permissions are - * defined by the file or directory permission bits. - * XAttr will be set only when login user has correct permissions. - * <p/> - * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes"> - * http://en.wikipedia.org/wiki/Extended_file_attributes</a> + * Refer to the HDFS extended attributes user documentation for details. + * * @param src file or directory * @param xAttr <code>XAttr</code> to set * @param flag set flag @@ -1288,18 +1283,13 @@ public interface ClientProtocol { throws IOException; /** - * Get xattrs of file or directory. Values in xAttrs parameter are ignored. - * If xattrs is null or empty, equals getting all xattrs of the file or - * directory. - * Only xattrs which login user has correct permissions will be returned. - * <p/> - * A regular user only can get xattr of "user" namespace. - * A super user can get xattr of "user" and "trusted" namespace. - * XAttr of "security" and "system" namespace is only used/exposed - * internally to the FS impl. + * Get xattrs of a file or directory. Values in xAttrs parameter are ignored. + * If xAttrs is null or empty, this is the same as getting all xattrs of the + * file or directory. Only those xattrs for which the logged-in user has + * permissions to view are returned. * <p/> - * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes"> - * http://en.wikipedia.org/wiki/Extended_file_attributes</a> + * Refer to the HDFS extended attributes user documentation for details. + * * @param src file or directory * @param xAttrs xAttrs to get * @return List<XAttr> <code>XAttr</code> list @@ -1314,13 +1304,8 @@ public interface ClientProtocol { * Only the xattr names for which the logged in user has the permissions to * access will be returned. * <p/> - * A regular user only can get xattr names from the "user" namespace. - * A super user can get xattr names of the "user" and "trusted" namespace. - * XAttr names of the "security" and "system" namespaces are only used/exposed - * internally by the file system impl. - * <p/> - * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes"> - * http://en.wikipedia.org/wiki/Extended_file_attributes</a> + * Refer to the HDFS extended attributes user documentation for details. + * * @param src file or directory * @param xAttrs xAttrs to get * @return List<XAttr> <code>XAttr</code> list @@ -1332,19 +1317,33 @@ public interface ClientProtocol { /** * Remove xattr of a file or directory.Value in xAttr parameter is ignored. - * Name must be prefixed with user/trusted/security/system. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". * <p/> - * A regular user only can remove xattr of "user" namespace. - * A super user can remove xattr of "user" and "trusted" namespace. - * XAttr of "security" and "system" namespace is only used/exposed - * internally to the FS impl. - * <p/> - * @see <a href="http://en.wikipedia.org/wiki/Extended_file_attributes"> - * http://en.wikipedia.org/wiki/Extended_file_attributes</a> + * Refer to the HDFS extended attributes user documentation for details. + * * @param src file or directory * @param xAttr <code>XAttr</code> to remove * @throws IOException */ @AtMostOnce public void removeXAttr(String src, XAttr xAttr) throws IOException; + + /** + * Checks if the user can access a path. The mode specifies which access + * checks to perform. If the requested permissions are granted, then the + * method returns normally. If access is denied, then the method throws an + * {@link AccessControlException}. + * In general, applications should avoid using this method, due to the risk of + * time-of-check/time-of-use race conditions. The permissions on a file may + * change immediately after the access call returns. + * + * @param path Path to check + * @param mode type of access to check + * @throws AccessControlException if access is denied + * @throws FileNotFoundException if the path does not exist + * @throws IOException see specific implementation + */ + @Idempotent + public void checkAccess(String path, FsAction mode) throws IOException; } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Aug 1 01:29:49 2014 @@ -174,6 +174,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; @@ -320,6 +322,9 @@ public class ClientNamenodeProtocolServe private static final RemoveXAttrResponseProto VOID_REMOVEXATTR_RESPONSE = RemoveXAttrResponseProto.getDefaultInstance(); + private static final CheckAccessResponseProto + VOID_CHECKACCESS_RESPONSE = CheckAccessResponseProto.getDefaultInstance(); + /** * Constructor * @@ -1338,4 +1343,15 @@ public class ClientNamenodeProtocolServe } return VOID_REMOVEXATTR_RESPONSE; } + + @Override + public CheckAccessResponseProto checkAccess(RpcController controller, + CheckAccessRequestProto req) throws ServiceException { + try { + server.checkAccess(req.getPath(), PBHelper.convert(req.getMode())); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_CHECKACCESS_RESPONSE; + } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Aug 1 01:29:49 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -144,6 +145,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; @@ -1346,4 +1348,15 @@ public class ClientNamenodeProtocolTrans throw ProtobufHelper.getRemoteException(e); } } + + @Override + public void checkAccess(String path, FsAction mode) throws IOException { + CheckAccessRequestProto req = CheckAccessRequestProto.newBuilder() + .setPath(path).setMode(PBHelper.convert(mode)).build(); + try { + rpcProxy.checkAccess(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Aug 1 01:29:49 2014 @@ -352,15 +352,19 @@ public class PBHelper { return BlockWithLocationsProto.newBuilder() .setBlock(convert(blk.getBlock())) .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) - .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build(); + .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())) + .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes())) + .build(); } public static BlockWithLocations convert(BlockWithLocationsProto b) { final List<String> datanodeUuids = b.getDatanodeUuidsList(); final List<String> storageUuids = b.getStorageUuidsList(); + final List<StorageTypeProto> storageTypes = b.getStorageTypesList(); return new BlockWithLocations(convert(b.getBlock()), datanodeUuids.toArray(new String[datanodeUuids.size()]), - storageUuids.toArray(new String[storageUuids.size()])); + storageUuids.toArray(new String[storageUuids.size()]), + convertStorageTypes(storageTypes, storageUuids.size())); } public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { @@ -2111,11 +2115,11 @@ public class PBHelper { return castEnum(v, XATTR_NAMESPACE_VALUES); } - private static FsActionProto convert(FsAction v) { + public static FsActionProto convert(FsAction v) { return FsActionProto.valueOf(v != null ? v.ordinal() : 0); } - private static FsAction convert(FsActionProto v) { + public static FsAction convert(FsActionProto v) { return castEnum(v, FSACTION_VALUES); } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Aug 1 01:29:49 2014 @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.EnumMap; import java.util.Formatter; import java.util.HashMap; import java.util.HashSet; @@ -45,6 +46,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -78,16 +80,21 @@ import org.apache.hadoop.hdfs.server.blo import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.google.common.base.Preconditions; + /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster * when some datanodes become full or when new empty nodes join the cluster. * The tool is deployed as an application program that can be run by the @@ -188,7 +195,9 @@ import org.apache.hadoop.util.ToolRunner @InterfaceAudience.Private public class Balancer { static final Log LOG = LogFactory.getLog(Balancer.class); - final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB + final private static long GB = 1L << 30; //1GB + final private static long MAX_SIZE_TO_MOVE = 10*GB; + final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB; private static long WIN_WIDTH = 5400*1000L; // 1.5 hour /** The maximum number of concurrent blocks moves for @@ -203,34 +212,38 @@ public class Balancer { + "\n\t[-policy <policy>]\tthe balancing policy: " + BalancingPolicy.Node.INSTANCE.getName() + " or " + BalancingPolicy.Pool.INSTANCE.getName() - + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"; + + "\n\t[-threshold <threshold>]\tPercentage of disk capacity" + + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]" + + "\tExcludes the specified datanodes." + + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]" + + "\tIncludes only the specified datanodes."; private final NameNodeConnector nnc; private final BalancingPolicy policy; private final SaslDataTransferClient saslClient; private final double threshold; + // set of data nodes to be excluded from balancing operations. + Set<String> nodesToBeExcluded; + //Restrict balancing to the following nodes. + Set<String> nodesToBeIncluded; // all data node lists - private final Collection<Source> overUtilizedDatanodes - = new LinkedList<Source>(); - private final Collection<Source> aboveAvgUtilizedDatanodes - = new LinkedList<Source>(); - private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes - = new LinkedList<BalancerDatanode>(); - private final Collection<BalancerDatanode> underUtilizedDatanodes - = new LinkedList<BalancerDatanode>(); - - private final Collection<Source> sources - = new HashSet<Source>(); - private final Collection<BalancerDatanode> targets - = new HashSet<BalancerDatanode>(); + private final Collection<Source> overUtilized = new LinkedList<Source>(); + private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>(); + private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized + = new LinkedList<BalancerDatanode.StorageGroup>(); + private final Collection<BalancerDatanode.StorageGroup> underUtilized + = new LinkedList<BalancerDatanode.StorageGroup>(); + + private final Collection<Source> sources = new HashSet<Source>(); + private final Collection<BalancerDatanode.StorageGroup> targets + = new HashSet<BalancerDatanode.StorageGroup>(); private final Map<Block, BalancerBlock> globalBlockList = new HashMap<Block, BalancerBlock>(); private final MovedBlocks movedBlocks = new MovedBlocks(); - /** Map (datanodeUuid -> BalancerDatanodes) */ - private final Map<String, BalancerDatanode> datanodeMap - = new HashMap<String, BalancerDatanode>(); + /** Map (datanodeUuid,storageType -> StorageGroup) */ + private final StorageGroupMap storageGroupMap = new StorageGroupMap(); private NetworkTopology cluster; @@ -238,12 +251,39 @@ public class Balancer { private final ExecutorService dispatcherExecutor; private final int maxConcurrentMovesPerNode; + + private static class StorageGroupMap { + private static String toKey(String datanodeUuid, StorageType storageType) { + return datanodeUuid + ":" + storageType; + } + + private final Map<String, BalancerDatanode.StorageGroup> map + = new HashMap<String, BalancerDatanode.StorageGroup>(); + + BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) { + return map.get(toKey(datanodeUuid, storageType)); + } + + void put(BalancerDatanode.StorageGroup g) { + final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType); + final BalancerDatanode.StorageGroup existing = map.put(key, g); + Preconditions.checkState(existing == null); + } + + int size() { + return map.size(); + } + + void clear() { + map.clear(); + } + } /* This class keeps track of a scheduled block move */ private class PendingBlockMove { private BalancerBlock block; private Source source; private BalancerDatanode proxySource; - private BalancerDatanode target; + private BalancerDatanode.StorageGroup target; /** constructor */ private PendingBlockMove() { @@ -254,7 +294,7 @@ public class Balancer { final Block b = block.getBlock(); return b + " with size=" + b.getNumBytes() + " from " + source.getDisplayName() + " to " + target.getDisplayName() - + " through " + proxySource.getDisplayName(); + + " through " + proxySource.datanode; } /* choose a block & a proxy source for this pendingMove @@ -306,20 +346,20 @@ public class Balancer { final DatanodeInfo targetDN = target.getDatanode(); // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { - for (BalancerDatanode loc : block.getLocations()) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) { return true; } } } // check if there is replica which is on the same rack with the target - for (BalancerDatanode loc : block.getLocations()) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { return true; } } // find out a non-busy replica - for (BalancerDatanode loc : block.getLocations()) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (addTo(loc)) { return true; } @@ -327,8 +367,9 @@ public class Balancer { return false; } - // add a BalancerDatanode as proxy source for specific block movement - private boolean addTo(BalancerDatanode bdn) { + /** add to a proxy source for specific block movement */ + private boolean addTo(BalancerDatanode.StorageGroup g) { + final BalancerDatanode bdn = g.getBalancerDatanode(); if (bdn.addPendingBlock(this)) { proxySource = bdn; return true; @@ -344,7 +385,7 @@ public class Balancer { DataInputStream in = null; try { sock.connect( - NetUtils.createSocketAddr(target.datanode.getXferAddr()), + NetUtils.createSocketAddr(target.getDatanode().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); /* Unfortunately we don't have a good way to know if the Datanode is * taking a really long time to move a block, OR something has @@ -361,7 +402,7 @@ public class Balancer { ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, nnc, accessToken, target.datanode); + unbufIn, nnc, accessToken, target.getDatanode()); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, @@ -381,14 +422,14 @@ public class Balancer { * gets out of sync with work going on in datanode. */ proxySource.activateDelay(DELAY_AFTER_ERROR); - target.activateDelay(DELAY_AFTER_ERROR); + target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); proxySource.removePendingBlock(this); - target.removePendingBlock(this); + target.getBalancerDatanode().removePendingBlock(this); synchronized (this ) { reset(); @@ -404,7 +445,7 @@ public class Balancer { StorageType storageType, Token<BlockTokenIdentifier> accessToken) throws IOException { new Sender(out).replaceBlock(eb, storageType, accessToken, - source.getStorageID(), proxySource.getDatanode()); + source.getDatanode().getDatanodeUuid(), proxySource.datanode); } /* Receive a block copy response from the input stream */ @@ -444,8 +485,9 @@ public class Balancer { /* A class for keeping track of blocks in the Balancer */ static private class BalancerBlock { private final Block block; // the block - private final List<BalancerDatanode> locations - = new ArrayList<BalancerDatanode>(3); // its locations + /** The locations of the replicas of the block. */ + private final List<BalancerDatanode.StorageGroup> locations + = new ArrayList<BalancerDatanode.StorageGroup>(3); /* Constructor */ private BalancerBlock(Block block) { @@ -458,20 +500,19 @@ public class Balancer { } /* add a location */ - private synchronized void addLocation(BalancerDatanode datanode) { - if (!locations.contains(datanode)) { - locations.add(datanode); + private synchronized void addLocation(BalancerDatanode.StorageGroup g) { + if (!locations.contains(g)) { + locations.add(g); } } - /* Return if the block is located on <code>datanode</code> */ - private synchronized boolean isLocatedOnDatanode( - BalancerDatanode datanode) { - return locations.contains(datanode); + /** @return if the block is located on the given storage group. */ + private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) { + return locations.contains(g); } /* Return its locations */ - private synchronized List<BalancerDatanode> getLocations() { + private synchronized List<BalancerDatanode.StorageGroup> getLocations() { return locations; } @@ -488,37 +529,84 @@ public class Balancer { /* The class represents a desired move of bytes between two nodes * and the target. - * An object of this class is stored in a source node. + * An object of this class is stored in a source. */ - static private class NodeTask { - private final BalancerDatanode datanode; //target node + static private class Task { + private final BalancerDatanode.StorageGroup target; private long size; //bytes scheduled to move /* constructor */ - private NodeTask(BalancerDatanode datanode, long size) { - this.datanode = datanode; + private Task(BalancerDatanode.StorageGroup target, long size) { + this.target = target; this.size = size; } - - /* Get the node */ - private BalancerDatanode getDatanode() { - return datanode; - } - - /* Get the number of bytes that need to be moved */ - private long getSize() { - return size; - } } /* A class that keeps track of a datanode in Balancer */ private static class BalancerDatanode { - final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB + + /** A group of storages in a datanode with the same storage type. */ + private class StorageGroup { + final StorageType storageType; + final double utilization; + final long maxSize2Move; + private long scheduledSize = 0L; + + private StorageGroup(StorageType storageType, double utilization, + long maxSize2Move) { + this.storageType = storageType; + this.utilization = utilization; + this.maxSize2Move = maxSize2Move; + } + + BalancerDatanode getBalancerDatanode() { + return BalancerDatanode.this; + } + + DatanodeInfo getDatanode() { + return BalancerDatanode.this.datanode; + } + + /** Decide if still need to move more bytes */ + protected synchronized boolean hasSpaceForScheduling() { + return availableSizeToMove() > 0L; + } + + /** @return the total number of bytes that need to be moved */ + synchronized long availableSizeToMove() { + return maxSize2Move - scheduledSize; + } + + /** increment scheduled size */ + synchronized void incScheduledSize(long size) { + scheduledSize += size; + } + + /** @return scheduled size */ + synchronized long getScheduledSize() { + return scheduledSize; + } + + /** Reset scheduled size to zero. */ + synchronized void resetScheduledSize() { + scheduledSize = 0L; + } + + /** @return the name for display */ + String getDisplayName() { + return datanode + ":" + storageType; + } + + @Override + public String toString() { + return "" + utilization; + } + } + final DatanodeInfo datanode; - final double utilization; - final long maxSize2Move; - private long scheduledSize = 0L; + final EnumMap<StorageType, StorageGroup> storageMap + = new EnumMap<StorageType, StorageGroup>(StorageType.class); protected long delayUntil = 0L; // blocks being moved but not confirmed yet private final List<PendingBlockMove> pendingBlocks; @@ -526,78 +614,38 @@ public class Balancer { @Override public String toString() { - return getClass().getSimpleName() + "[" + datanode - + ", utilization=" + utilization + "]"; + return getClass().getSimpleName() + ":" + datanode + ":" + storageMap; } /* Constructor * Depending on avgutil & threshold, calculate maximum bytes to move */ - private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold, - int maxConcurrentMoves) { - datanode = node; - utilization = policy.getUtilization(node); - final double avgUtil = policy.getAvgUtilization(); - long maxSizeToMove; - - if (utilization >= avgUtil+threshold - || utilization <= avgUtil-threshold) { - maxSizeToMove = (long)(threshold*datanode.getCapacity()/100); - } else { - maxSizeToMove = - (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100); - } - if (utilization < avgUtil ) { - maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove); - } - this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + private BalancerDatanode(DatanodeStorageReport report, + double threshold, int maxConcurrentMoves) { + this.datanode = report.getDatanodeInfo(); this.maxConcurrentMoves = maxConcurrentMoves; this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves); } - /** Get the datanode */ - protected DatanodeInfo getDatanode() { - return datanode; - } - - /** Get the name of the datanode */ - protected String getDisplayName() { - return datanode.toString(); - } - - /* Get the storage id of the datanode */ - protected String getStorageID() { - return datanode.getDatanodeUuid(); - } - - /** Decide if still need to move more bytes */ - protected synchronized boolean hasSpaceForScheduling() { - return scheduledSize<maxSize2Move; - } - - /** Return the total number of bytes that need to be moved */ - protected synchronized long availableSizeToMove() { - return maxSize2Move-scheduledSize; - } - - /** increment scheduled size */ - protected synchronized void incScheduledSize(long size) { - scheduledSize += size; - } - - /** decrement scheduled size */ - protected synchronized void decScheduledSize(long size) { - scheduledSize -= size; - } - - /** get scheduled size */ - protected synchronized long getScheduledSize(){ - return scheduledSize; - } - - /** get scheduled size */ - protected synchronized void setScheduledSize(long size){ - scheduledSize = size; + private void put(StorageType storageType, StorageGroup g) { + final StorageGroup existing = storageMap.put(storageType, g); + Preconditions.checkState(existing == null); + } + + StorageGroup addStorageGroup(StorageType storageType, double utilization, + long maxSize2Move) { + final StorageGroup g = new StorageGroup(storageType, utilization, + maxSize2Move); + put(storageType, g); + return g; + } + + Source addSource(StorageType storageType, double utilization, + long maxSize2Move, Balancer balancer) { + final Source s = balancer.new Source(storageType, utilization, + maxSize2Move, this); + put(storageType, s); + return s; } synchronized private void activateDelay(long delta) { @@ -640,9 +688,9 @@ public class Balancer { return pendingBlocks.remove(pendingBlock); } } - + /** A node that can be the sources of a block move */ - private class Source extends BalancerDatanode { + private class Source extends BalancerDatanode.StorageGroup { /* A thread that initiates a block move * and waits for block move to complete */ @@ -653,7 +701,7 @@ public class Balancer { } } - private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2); + private final List<Task> tasks = new ArrayList<Task>(2); private long blocksToReceive = 0L; /* source blocks point to balancerBlocks in the global list because * we want to keep one copy of a block in balancer and be aware that @@ -663,17 +711,17 @@ public class Balancer { = new ArrayList<BalancerBlock>(); /* constructor */ - private Source(DatanodeInfo node, BalancingPolicy policy, double threshold, - int maxConcurrentMoves) { - super(node, policy, threshold, maxConcurrentMoves); + private Source(StorageType storageType, double utilization, + long maxSize2Move, BalancerDatanode dn) { + dn.super(storageType, utilization, maxSize2Move); } - /** Add a node task */ - private void addNodeTask(NodeTask task) { - assert (task.datanode != this) : - "Source and target are the same " + datanode; - incScheduledSize(task.getSize()); - nodeTasks.add(task); + /** Add a task */ + private void addTask(Task task) { + Preconditions.checkState(task.target != this, + "Source and target are the same storage group " + getDisplayName()); + incScheduledSize(task.size); + tasks.add(task); } /* Return an iterator to this source's blocks */ @@ -686,8 +734,10 @@ public class Balancer { * Return the total size of the received blocks in the number of bytes. */ private long getBlockList() throws IOException { - BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, - Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks(); + final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); + final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks( + getDatanode(), size).getBlocks(); + long bytesReceived = 0; for (BlockWithLocations blk : newBlocks) { bytesReceived += blk.getBlock().getNumBytes(); @@ -703,10 +753,13 @@ public class Balancer { synchronized (block) { // update locations - for (String datanodeUuid : blk.getDatanodeUuids()) { - final BalancerDatanode d = datanodeMap.get(datanodeUuid); - if (d != null) { // not an unknown datanode - block.addLocation(d); + final String[] datanodeUuids = blk.getDatanodeUuids(); + final StorageType[] storageTypes = blk.getStorageTypes(); + for (int i = 0; i < datanodeUuids.length; i++) { + final BalancerDatanode.StorageGroup g = storageGroupMap.get( + datanodeUuids[i], storageTypes[i]); + if (g != null) { // not unknown + block.addLocation(g); } } } @@ -721,8 +774,8 @@ public class Balancer { /* Decide if the given block is a good candidate to move or not */ private boolean isGoodBlockCandidate(BalancerBlock block) { - for (NodeTask nodeTask : nodeTasks) { - if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) { + for (Task t : tasks) { + if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) { return true; } } @@ -737,20 +790,20 @@ public class Balancer { * The block should be dispatched immediately after this method is returned. */ private PendingBlockMove chooseNextBlockToMove() { - for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) { - NodeTask task = tasks.next(); - BalancerDatanode target = task.getDatanode(); + for (Iterator<Task> i = tasks.iterator(); i.hasNext();) { + final Task task = i.next(); + final BalancerDatanode target = task.target.getBalancerDatanode(); PendingBlockMove pendingBlock = new PendingBlockMove(); if (target.addPendingBlock(pendingBlock)) { // target is not busy, so do a tentative block allocation pendingBlock.source = this; - pendingBlock.target = target; + pendingBlock.target = task.target; if ( pendingBlock.chooseBlockAndProxy() ) { long blockSize = pendingBlock.block.getNumBytes(); - decScheduledSize(blockSize); + incScheduledSize(-blockSize); task.size -= blockSize; if (task.size == 0) { - tasks.remove(); + i.remove(); } return pendingBlock; } else { @@ -824,7 +877,7 @@ public class Balancer { // in case no blocks can be moved for source node's task, // jump out of while-loop after 5 iterations. if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) { - setScheduledSize(0); + resetScheduledSize(); } } @@ -869,6 +922,8 @@ public class Balancer { Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { this.threshold = p.threshold; this.policy = p.policy; + this.nodesToBeExcluded = p.nodesToBeExcluded; + this.nodesToBeIncluded = p.nodesToBeIncluded; this.nnc = theblockpool; cluster = NetworkTopology.getInstance(conf); @@ -889,95 +944,154 @@ public class Balancer { IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); } - /* Given a data node set, build a network topology and decide - * over-utilized datanodes, above average utilized datanodes, - * below average utilized datanodes, and underutilized datanodes. - * The input data node set is shuffled before the datanodes - * are put into the over-utilized datanodes, above average utilized - * datanodes, below average utilized datanodes, and - * underutilized datanodes lists. This will add some randomness - * to the node matching later on. - * + + private static long getCapacity(DatanodeStorageReport report, StorageType t) { + long capacity = 0L; + for(StorageReport r : report.getStorageReports()) { + if (r.getStorage().getStorageType() == t) { + capacity += r.getCapacity(); + } + } + return capacity; + } + + private static long getRemaining(DatanodeStorageReport report, StorageType t) { + long remaining = 0L; + for(StorageReport r : report.getStorageReports()) { + if (r.getStorage().getStorageType() == t) { + remaining += r.getRemaining(); + } + } + return remaining; + } + + private boolean shouldIgnore(DatanodeInfo dn) { + //ignore decommissioned nodes + final boolean decommissioned = dn.isDecommissioned(); + //ignore decommissioning nodes + final boolean decommissioning = dn.isDecommissionInProgress(); + // ignore nodes in exclude list + final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn); + // ignore nodes not in the include list (if include list is not empty) + final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn); + + if (decommissioned || decommissioning || excluded || notIncluded) { + if (LOG.isTraceEnabled()) { + LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", " + + decommissioning + ", " + excluded + ", " + notIncluded); + } + return true; + } + return false; + } + + /** + * Given a datanode storage set, build a network topology and decide + * over-utilized storages, above average utilized storages, + * below average utilized storages, and underutilized storages. + * The input datanode storage set is shuffled in order to randomize + * to the storage matching later on. + * * @return the total number of bytes that are * needed to move to make the cluster balanced. - * @param datanodes a set of datanodes + * @param reports a set of datanode storage reports */ - private long initNodes(DatanodeInfo[] datanodes) { + private long init(DatanodeStorageReport[] reports) { // compute average utilization - for (DatanodeInfo datanode : datanodes) { - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { - continue; // ignore decommissioning or decommissioned nodes + for (DatanodeStorageReport r : reports) { + if (shouldIgnore(r.getDatanodeInfo())) { + continue; } - policy.accumulateSpaces(datanode); + policy.accumulateSpaces(r); } policy.initAvgUtilization(); - /*create network topology and all data node lists: - * overloaded, above-average, below-average, and underloaded - * we alternates the accessing of the given datanodes array either by - * an increasing order or a decreasing order. - */ + // create network topology and classify utilization collections: + // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; - for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) { - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { + for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) { + final DatanodeInfo datanode = r.getDatanodeInfo(); + if (shouldIgnore(datanode)) { continue; // ignore decommissioning or decommissioned nodes } cluster.add(datanode); - BalancerDatanode datanodeS; - final double avg = policy.getAvgUtilization(); - if (policy.getUtilization(datanode) >= avg) { - datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode); - if (isAboveAvgUtilized(datanodeS)) { - this.aboveAvgUtilizedDatanodes.add((Source)datanodeS); - } else { - assert(isOverUtilized(datanodeS)) : - datanodeS.getDisplayName()+ "is not an overUtilized node"; - this.overUtilizedDatanodes.add((Source)datanodeS); - overLoadedBytes += (long)((datanodeS.utilization-avg - -threshold)*datanodeS.datanode.getCapacity()/100.0); + + final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes, + maxConcurrentMovesPerNode); + for(StorageType t : StorageType.asList()) { + final Double utilization = policy.getUtilization(r, t); + if (utilization == null) { // datanode does not have such storage type + continue; } - } else { - datanodeS = new BalancerDatanode(datanode, policy, threshold, - maxConcurrentMovesPerNode); - if ( isBelowOrEqualAvgUtilized(datanodeS)) { - this.belowAvgUtilizedDatanodes.add(datanodeS); + + final long capacity = getCapacity(r, t); + final double utilizationDiff = utilization - policy.getAvgUtilization(t); + final double thresholdDiff = Math.abs(utilizationDiff) - threshold; + final long maxSize2Move = computeMaxSize2Move(capacity, + getRemaining(r, t), utilizationDiff, threshold); + + final BalancerDatanode.StorageGroup g; + if (utilizationDiff > 0) { + final Source s = dn.addSource(t, utilization, maxSize2Move, this); + if (thresholdDiff <= 0) { // within threshold + aboveAvgUtilized.add(s); + } else { + overLoadedBytes += precentage2bytes(thresholdDiff, capacity); + overUtilized.add(s); + } + g = s; } else { - assert isUnderUtilized(datanodeS) : "isUnderUtilized(" - + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS) - + ", utilization=" + datanodeS.utilization; - this.underUtilizedDatanodes.add(datanodeS); - underLoadedBytes += (long)((avg-threshold- - datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0); + g = dn.addStorageGroup(t, utilization, maxSize2Move); + if (thresholdDiff <= 0) { // within threshold + belowAvgUtilized.add(g); + } else { + underLoadedBytes += precentage2bytes(thresholdDiff, capacity); + underUtilized.add(g); + } } + storageGroupMap.put(g); } - datanodeMap.put(datanode.getDatanodeUuid(), datanodeS); } - //logging - logNodes(); + logUtilizationCollections(); - assert (this.datanodeMap.size() == - overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ - aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()) - : "Mismatched number of datanodes"; + Preconditions.checkState(storageGroupMap.size() == overUtilized.size() + + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(), + "Mismatched number of storage groups"); // return number of bytes to be moved in order to make the cluster balanced return Math.max(overLoadedBytes, underLoadedBytes); } + private static long computeMaxSize2Move(final long capacity, final long remaining, + final double utilizationDiff, final double threshold) { + final double diff = Math.min(threshold, Math.abs(utilizationDiff)); + long maxSizeToMove = precentage2bytes(diff, capacity); + if (utilizationDiff < 0) { + maxSizeToMove = Math.min(remaining, maxSizeToMove); + } + return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + } + + private static long precentage2bytes(double precentage, long capacity) { + Preconditions.checkArgument(precentage >= 0, + "precentage = " + precentage + " < 0"); + return (long)(precentage * capacity / 100.0); + } + /* log the over utilized & under utilized nodes */ - private void logNodes() { - logNodes("over-utilized", overUtilizedDatanodes); + private void logUtilizationCollections() { + logUtilizationCollection("over-utilized", overUtilized); if (LOG.isTraceEnabled()) { - logNodes("above-average", aboveAvgUtilizedDatanodes); - logNodes("below-average", belowAvgUtilizedDatanodes); + logUtilizationCollection("above-average", aboveAvgUtilized); + logUtilizationCollection("below-average", belowAvgUtilized); } - logNodes("underutilized", underUtilizedDatanodes); + logUtilizationCollection("underutilized", underUtilized); } - private static <T extends BalancerDatanode> void logNodes( - String name, Collection<T> nodes) { - LOG.info(nodes.size() + " " + name + ": " + nodes); + private static <T extends BalancerDatanode.StorageGroup> + void logUtilizationCollection(String name, Collection<T> items) { + LOG.info(items.size() + " " + name + ": " + items); } /** A matcher interface for matching nodes. */ @@ -1013,26 +1127,24 @@ public class Balancer { /** * Decide all <source, target> pairs and * the number of bytes to move from a source to a target - * Maximum bytes to be moved per node is - * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). - * Return total number of bytes to move in this iteration + * Maximum bytes to be moved per storage group is + * min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). + * @return total number of bytes to move in this iteration */ - private long chooseNodes() { + private long chooseStorageGroups() { // First, match nodes on the same node group if cluster is node group aware if (cluster.isNodeGroupAware()) { - chooseNodes(SAME_NODE_GROUP); + chooseStorageGroups(SAME_NODE_GROUP); } // Then, match nodes on the same rack - chooseNodes(SAME_RACK); + chooseStorageGroups(SAME_RACK); // At last, match all remaining nodes - chooseNodes(ANY_OTHER); + chooseStorageGroups(ANY_OTHER); - assert (datanodeMap.size() >= sources.size()+targets.size()) - : "Mismatched number of datanodes (" + - datanodeMap.size() + " total, " + - sources.size() + " sources, " + - targets.size() + " targets)"; + Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(), + "Mismatched number of datanodes (" + storageGroupMap.size() + " < " + + sources.size() + " sources, " + targets.size() + " targets)"); long bytesToMove = 0L; for (Source src : sources) { @@ -1042,25 +1154,25 @@ public class Balancer { } /** Decide all <source, target> pairs according to the matcher. */ - private void chooseNodes(final Matcher matcher) { + private void chooseStorageGroups(final Matcher matcher) { /* first step: match each overUtilized datanode (source) to * one or more underUtilized datanodes (targets). */ - chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher); + chooseStorageGroups(overUtilized, underUtilized, matcher); /* match each remaining overutilized datanode (source) to * below average utilized datanodes (targets). * Note only overutilized datanodes that haven't had that max bytes to move * satisfied in step 1 are selected */ - chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher); + chooseStorageGroups(overUtilized, belowAvgUtilized, matcher); /* match each remaining underutilized datanode (target) to * above average utilized datanodes (source). * Note only underutilized datanodes that have not had that max bytes to * move satisfied in step 1 are selected. */ - chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher); + chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher); } /** @@ -1068,13 +1180,14 @@ public class Balancer { * datanodes or the candidates are source nodes with (utilization > Avg), and * the others are target nodes with (utilization < Avg). */ - private <D extends BalancerDatanode, C extends BalancerDatanode> void - chooseDatanodes(Collection<D> datanodes, Collection<C> candidates, + private <G extends BalancerDatanode.StorageGroup, + C extends BalancerDatanode.StorageGroup> + void chooseStorageGroups(Collection<G> groups, Collection<C> candidates, Matcher matcher) { - for (Iterator<D> i = datanodes.iterator(); i.hasNext();) { - final D datanode = i.next(); - for(; chooseForOneDatanode(datanode, candidates, matcher); ); - if (!datanode.hasSpaceForScheduling()) { + for(final Iterator<G> i = groups.iterator(); i.hasNext();) { + final G g = i.next(); + for(; choose4One(g, candidates, matcher); ); + if (!g.hasSpaceForScheduling()) { i.remove(); } } @@ -1084,18 +1197,19 @@ public class Balancer { * For the given datanode, choose a candidate and then schedule it. * @return true if a candidate is chosen; false if no candidates is chosen. */ - private <C extends BalancerDatanode> boolean chooseForOneDatanode( - BalancerDatanode dn, Collection<C> candidates, Matcher matcher) { + private <C extends BalancerDatanode.StorageGroup> + boolean choose4One(BalancerDatanode.StorageGroup g, + Collection<C> candidates, Matcher matcher) { final Iterator<C> i = candidates.iterator(); - final C chosen = chooseCandidate(dn, i, matcher); - + final C chosen = chooseCandidate(g, i, matcher); + if (chosen == null) { return false; } - if (dn instanceof Source) { - matchSourceWithTargetToMove((Source)dn, chosen); + if (g instanceof Source) { + matchSourceWithTargetToMove((Source)g, chosen); } else { - matchSourceWithTargetToMove((Source)chosen, dn); + matchSourceWithTargetToMove((Source)chosen, g); } if (!chosen.hasSpaceForScheduling()) { i.remove(); @@ -1103,27 +1217,28 @@ public class Balancer { return true; } - private void matchSourceWithTargetToMove( - Source source, BalancerDatanode target) { + private void matchSourceWithTargetToMove(Source source, + BalancerDatanode.StorageGroup target) { long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); - NodeTask nodeTask = new NodeTask(target, size); - source.addNodeTask(nodeTask); - target.incScheduledSize(nodeTask.getSize()); + final Task task = new Task(target, size); + source.addTask(task); + target.incScheduledSize(task.size); sources.add(source); targets.add(target); LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " - +source.datanode.getName() + " to " + target.datanode.getName()); + + source.getDisplayName() + " to " + target.getDisplayName()); } /** Choose a candidate for the given datanode. */ - private <D extends BalancerDatanode, C extends BalancerDatanode> - C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) { - if (dn.hasSpaceForScheduling()) { + private <G extends BalancerDatanode.StorageGroup, + C extends BalancerDatanode.StorageGroup> + C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) { + if (g.hasSpaceForScheduling()) { for(; candidates.hasNext(); ) { final C c = candidates.next(); if (!c.hasSpaceForScheduling()) { candidates.remove(); - } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) { + } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) { return c; } } @@ -1177,9 +1292,10 @@ public class Balancer { boolean shouldWait; do { shouldWait = false; - for (BalancerDatanode target : targets) { - if (!target.isPendingQEmpty()) { + for (BalancerDatanode.StorageGroup target : targets) { + if (!target.getBalancerDatanode().isPendingQEmpty()) { shouldWait = true; + break; } } if (shouldWait) { @@ -1248,12 +1364,15 @@ public class Balancer { * 3. doing the move does not reduce the number of racks that the block has */ private boolean isGoodBlockCandidate(Source source, - BalancerDatanode target, BalancerBlock block) { + BalancerDatanode.StorageGroup target, BalancerBlock block) { + if (source.storageType != target.storageType) { + return false; + } // check if the block is moved or not if (movedBlocks.contains(block)) { - return false; + return false; } - if (block.isLocatedOnDatanode(target)) { + if (block.isLocatedOn(target)) { return false; } if (cluster.isNodeGroupAware() && @@ -1268,8 +1387,8 @@ public class Balancer { } else { boolean notOnSameRack = true; synchronized (block) { - for (BalancerDatanode loc : block.locations) { - if (cluster.isOnSameRack(loc.datanode, target.datanode)) { + for (BalancerDatanode.StorageGroup loc : block.locations) { + if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { notOnSameRack = false; break; } @@ -1280,9 +1399,9 @@ public class Balancer { goodBlock = true; } else { // good if source is on the same rack as on of the replicas - for (BalancerDatanode loc : block.locations) { + for (BalancerDatanode.StorageGroup loc : block.locations) { if (loc != source && - cluster.isOnSameRack(loc.datanode, source.datanode)) { + cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) { goodBlock = true; break; } @@ -1303,25 +1422,26 @@ public class Balancer { * @return true if there are any replica (other than source) on the same node * group with target */ - private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target, + private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target, BalancerBlock block, Source source) { - for (BalancerDatanode loc : block.locations) { + final DatanodeInfo targetDn = target.getDatanode(); + for (BalancerDatanode.StorageGroup loc : block.locations) { if (loc != source && - cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) { - return true; - } + cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) { + return true; } + } return false; } /* reset all fields in a balancer preparing for the next iteration */ private void resetData(Configuration conf) { this.cluster = NetworkTopology.getInstance(conf); - this.overUtilizedDatanodes.clear(); - this.aboveAvgUtilizedDatanodes.clear(); - this.belowAvgUtilizedDatanodes.clear(); - this.underUtilizedDatanodes.clear(); - this.datanodeMap.clear(); + this.overUtilized.clear(); + this.aboveAvgUtilized.clear(); + this.belowAvgUtilized.clear(); + this.underUtilized.clear(); + this.storageGroupMap.clear(); this.sources.clear(); this.targets.clear(); this.policy.reset(); @@ -1341,32 +1461,6 @@ public class Balancer { } } } - - /* Return true if the given datanode is overUtilized */ - private boolean isOverUtilized(BalancerDatanode datanode) { - return datanode.utilization > (policy.getAvgUtilization()+threshold); - } - - /* Return true if the given datanode is above or equal to average utilized - * but not overUtilized */ - private boolean isAboveAvgUtilized(BalancerDatanode datanode) { - final double avg = policy.getAvgUtilization(); - return (datanode.utilization <= (avg+threshold)) - && (datanode.utilization >= avg); - } - - /* Return true if the given datanode is underUtilized */ - private boolean isUnderUtilized(BalancerDatanode datanode) { - return datanode.utilization < (policy.getAvgUtilization()-threshold); - } - - /* Return true if the given datanode is below average utilized - * but not underUtilized */ - private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) { - final double avg = policy.getAvgUtilization(); - return (datanode.utilization >= (avg-threshold)) - && (datanode.utilization <= avg); - } // Exit status enum ReturnStatus { @@ -1394,7 +1488,8 @@ public class Balancer { /* get all live datanodes of a cluster and their disk usage * decide the number of bytes need to be moved */ - final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE)); + final long bytesLeftToMove = init( + nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE)); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); return ReturnStatus.SUCCESS; @@ -1408,7 +1503,7 @@ public class Balancer { * in this iteration. Maximum bytes to be moved per node is * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). */ - final long bytesToMove = chooseNodes(); + final long bytesToMove = chooseStorageGroups(); if (bytesToMove == 0) { System.out.println("No block can be moved. Exiting..."); return ReturnStatus.NO_MOVE_BLOCK; @@ -1526,21 +1621,101 @@ public class Balancer { } static class Parameters { - static final Parameters DEFALUT = new Parameters( - BalancingPolicy.Node.INSTANCE, 10.0); + static final Parameters DEFAULT = new Parameters( + BalancingPolicy.Node.INSTANCE, 10.0, + Collections.<String> emptySet(), Collections.<String> emptySet()); final BalancingPolicy policy; final double threshold; + // exclude the nodes in this set from balancing operations + Set<String> nodesToBeExcluded; + //include only these nodes in balancing operations + Set<String> nodesToBeIncluded; - Parameters(BalancingPolicy policy, double threshold) { + Parameters(BalancingPolicy policy, double threshold, + Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) { this.policy = policy; this.threshold = threshold; + this.nodesToBeExcluded = nodesToBeExcluded; + this.nodesToBeIncluded = nodesToBeIncluded; } @Override public String toString() { return Balancer.class.getSimpleName() + "." + getClass().getSimpleName() - + "[" + policy + ", threshold=" + threshold + "]"; + + "[" + policy + ", threshold=" + threshold + + ", number of nodes to be excluded = "+ nodesToBeExcluded.size() + + ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]"; + } + } + + static class Util { + + /** + * @param datanode + * @return returns true if data node is part of the excludedNodes. + */ + static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) { + return isIn(excludedNodes, datanode); + } + + /** + * @param datanode + * @return returns true if includedNodes is empty or data node is part of the includedNodes. + */ + static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) { + return (includedNodes.isEmpty() || + isIn(includedNodes, datanode)); + } + /** + * Match is checked using host name , ip address with and without port number. + * @param datanodeSet + * @param datanode + * @return true if the datanode's transfer address matches the set of nodes. + */ + private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) { + return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) || + isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) || + isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort()); + } + + /** + * returns true if nodes contains host or host:port + * @param nodes + * @param host + * @param port + * @return + */ + private static boolean isIn(Set<String> nodes, String host, int port) { + if (host == null) { + return false; + } + return (nodes.contains(host) || nodes.contains(host +":"+ port)); + } + + /** + * parse a comma separated string to obtain set of host names + * @param string + * @return + */ + static Set<String> parseHostList(String string) { + String[] addrs = StringUtils.getTrimmedStrings(string); + return new HashSet<String>(Arrays.asList(addrs)); + } + + /** + * read set of host names from a file + * @param fileName + * @return + */ + static Set<String> getHostListFromFile(String fileName) { + Set<String> nodes = new HashSet <String> (); + try { + HostsFileReader.readFileToSet("nodes", fileName, nodes); + return StringUtils.getTrimmedStrings(nodes); + } catch (IOException e) { + throw new IllegalArgumentException("Unable to open file: " + fileName); + } } } @@ -1578,8 +1753,10 @@ public class Balancer { /** parse command line arguments */ static Parameters parse(String[] args) { - BalancingPolicy policy = Parameters.DEFALUT.policy; - double threshold = Parameters.DEFALUT.threshold; + BalancingPolicy policy = Parameters.DEFAULT.policy; + double threshold = Parameters.DEFAULT.threshold; + Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; + Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; if (args != null) { try { @@ -1608,18 +1785,38 @@ public class Balancer { System.err.println("Illegal policy name: " + args[i]); throw e; } + } else if ("-exclude".equalsIgnoreCase(args[i])) { + i++; + if ("-f".equalsIgnoreCase(args[i])) { + nodesTobeExcluded = Util.getHostListFromFile(args[++i]); + } else { + nodesTobeExcluded = Util.parseHostList(args[i]); + } + } else if ("-include".equalsIgnoreCase(args[i])) { + i++; + if ("-f".equalsIgnoreCase(args[i])) { + nodesTobeIncluded = Util.getHostListFromFile(args[++i]); + } else { + nodesTobeIncluded = Util.parseHostList(args[i]); + } } else { throw new IllegalArgumentException("args = " + Arrays.toString(args)); } } + if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) { + System.err.println( + "-exclude and -include options cannot be specified together."); + throw new IllegalArgumentException( + "-exclude and -include options cannot be specified together."); + } } catch(RuntimeException e) { printUsage(System.err); throw e; } } - return new Parameters(policy, threshold); + return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded); } private static void printUsage(PrintStream out) { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1615020&r1=1615019&r2=1615020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Fri Aug 1 01:29:49 2014 @@ -18,7 +18,11 @@ package org.apache.hadoop.hdfs.server.balancer; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.EnumCounters; +import org.apache.hadoop.hdfs.util.EnumDoubles; /** * Balancing policy. @@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D */ @InterfaceAudience.Private abstract class BalancingPolicy { - long totalCapacity; - long totalUsedSpace; - private double avgUtilization; + final EnumCounters<StorageType> totalCapacities + = new EnumCounters<StorageType>(StorageType.class); + final EnumCounters<StorageType> totalUsedSpaces + = new EnumCounters<StorageType>(StorageType.class); + final EnumDoubles<StorageType> avgUtilizations + = new EnumDoubles<StorageType>(StorageType.class); void reset() { - totalCapacity = 0L; - totalUsedSpace = 0L; - avgUtilization = 0.0; + totalCapacities.reset(); + totalUsedSpaces.reset(); + avgUtilizations.reset(); } /** Get the policy name. */ abstract String getName(); /** Accumulate used space and capacity. */ - abstract void accumulateSpaces(DatanodeInfo d); + abstract void accumulateSpaces(DatanodeStorageReport r); void initAvgUtilization() { - this.avgUtilization = totalUsedSpace*100.0/totalCapacity; + for(StorageType t : StorageType.asList()) { + final long capacity = totalCapacities.get(t); + if (capacity > 0L) { + final double avg = totalUsedSpaces.get(t)*100.0/capacity; + avgUtilizations.set(t, avg); + } + } } - double getAvgUtilization() { - return avgUtilization; + + double getAvgUtilization(StorageType t) { + return avgUtilizations.get(t); } - /** Return the utilization of a datanode */ - abstract double getUtilization(DatanodeInfo d); + /** @return the utilization of a particular storage type of a datanode; + * or return null if the datanode does not have such storage type. + */ + abstract Double getUtilization(DatanodeStorageReport r, StorageType t); @Override public String toString() { @@ -84,14 +100,25 @@ abstract class BalancingPolicy { } @Override - void accumulateSpaces(DatanodeInfo d) { - totalCapacity += d.getCapacity(); - totalUsedSpace += d.getDfsUsed(); + void accumulateSpaces(DatanodeStorageReport r) { + for(StorageReport s : r.getStorageReports()) { + final StorageType t = s.getStorage().getStorageType(); + totalCapacities.add(t, s.getCapacity()); + totalUsedSpaces.add(t, s.getDfsUsed()); + } } @Override - double getUtilization(DatanodeInfo d) { - return d.getDfsUsed()*100.0/d.getCapacity(); + Double getUtilization(DatanodeStorageReport r, final StorageType t) { + long capacity = 0L; + long dfsUsed = 0L; + for(StorageReport s : r.getStorageReports()) { + if (s.getStorage().getStorageType() == t) { + capacity += s.getCapacity(); + dfsUsed += s.getDfsUsed(); + } + } + return capacity == 0L? null: dfsUsed*100.0/capacity; } } @@ -108,14 +135,25 @@ abstract class BalancingPolicy { } @Override - void accumulateSpaces(DatanodeInfo d) { - totalCapacity += d.getCapacity(); - totalUsedSpace += d.getBlockPoolUsed(); + void accumulateSpaces(DatanodeStorageReport r) { + for(StorageReport s : r.getStorageReports()) { + final StorageType t = s.getStorage().getStorageType(); + totalCapacities.add(t, s.getCapacity()); + totalUsedSpaces.add(t, s.getBlockPoolUsed()); + } } @Override - double getUtilization(DatanodeInfo d) { - return d.getBlockPoolUsed()*100.0/d.getCapacity(); + Double getUtilization(DatanodeStorageReport r, final StorageType t) { + long capacity = 0L; + long blockPoolUsed = 0L; + for(StorageReport s : r.getStorageReports()) { + if (s.getStorage().getStorageType() == t) { + capacity += s.getCapacity(); + blockPoolUsed += s.getBlockPoolUsed(); + } + } + return capacity == 0L? null: blockPoolUsed*100.0/capacity; } } }