http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 4bb1359..4db09ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSLinkResolver; -import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -52,7 +51,6 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.Options.ChecksumOpt; -import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; @@ -88,7 +86,6 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -114,7 +111,7 @@ public class DistributedFileSystem extends FileSystem { DFSClient dfs; private boolean verifyChecksum = true; - + static{ HdfsConfigurationLoader.init(); } @@ -173,8 +170,8 @@ public class DistributedFileSystem extends FileSystem { public void setWorkingDirectory(Path dir) { String result = fixRelativePart(dir).toUri().getPath(); if (!DFSUtilClient.isValidName(result)) { - throw new IllegalArgumentException("Invalid DFS directory name " + - result); + throw new IllegalArgumentException("Invalid DFS directory name " + + result); } workingDir = fixRelativePart(dir); } @@ -188,7 +185,7 @@ public class DistributedFileSystem extends FileSystem { /** * Checks that the passed URI belongs to this filesystem and returns * just the path component. Expects a URI with an absolute path. - * + * * @param file URI with absolute path * @return path component of {file} * @throws IllegalArgumentException if URI does not belong to this DFS @@ -198,11 +195,11 @@ public class DistributedFileSystem extends FileSystem { String result = file.toUri().getPath(); if (!DFSUtilClient.isValidName(result)) { throw new IllegalArgumentException("Pathname " + result + " from " + - file+" is not a valid DFS filename."); + file+" is not a valid DFS filename."); } return result; } - + @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { @@ -211,9 +208,9 @@ public class DistributedFileSystem extends FileSystem { } return getFileBlockLocations(file.getPath(), start, len); } - + @Override - public BlockLocation[] getFileBlockLocations(Path p, + public BlockLocation[] getFileBlockLocations(Path p, final long start, final long len) throws IOException { statistics.incrementReadOps(1); final Path absF = fixRelativePart(p); @@ -271,7 +268,7 @@ public class DistributedFileSystem extends FileSystem { this.verifyChecksum = verifyChecksum; } - /** + /** * Start the lease recovery of a file * * @param f a file @@ -282,8 +279,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<Boolean>() { @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Boolean doCall(final Path p) throws IOException{ return dfs.recoverLease(getPathName(p)); } @Override @@ -306,10 +302,9 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataInputStream>() { @Override - public FSDataInputStream doCall(final Path p) - throws IOException, UnresolvedLinkException { + public FSDataInputStream doCall(final Path p) throws IOException { final DFSInputStream dfsis = - dfs.open(getPathName(p), bufferSize, verifyChecksum); + dfs.open(getPathName(p), bufferSize, verifyChecksum); return dfs.createWrappedInputStream(dfsis); } @Override @@ -328,7 +323,7 @@ public class DistributedFileSystem extends FileSystem { /** * Append to an existing file (optional operation). - * + * * @param f the existing file to be appended. * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory * to be present. @@ -358,7 +353,7 @@ public class DistributedFileSystem extends FileSystem { /** * Append to an existing file (optional operation). - * + * * @param f the existing file to be appended. * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory * to be present. @@ -399,9 +394,9 @@ public class DistributedFileSystem extends FileSystem { } /** - * Same as - * {@link #create(Path, FsPermission, boolean, int, short, long, - * Progressable)} with the addition of favoredNodes that is a hint to + * Same as + * {@link #create(Path, FsPermission, boolean, int, short, long, + * Progressable)} with the addition of favoredNodes that is a hint to * where the namenode should place the file blocks. * The favored nodes hint is not persisted in HDFS. Hence it may be honored * at the creation time only. And with favored nodes, blocks will be pinned @@ -413,13 +408,12 @@ public class DistributedFileSystem extends FileSystem { final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final InetSocketAddress[] favoredNodes) - throws IOException { + throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<HdfsDataOutputStream>() { @Override - public HdfsDataOutputStream doCall(final Path p) - throws IOException, UnresolvedLinkException { + public HdfsDataOutputStream doCall(final Path p) throws IOException { final DFSOutputStream out = dfs.create(getPathName(f), permission, overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), @@ -441,21 +435,21 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } - + @Override public FSDataOutputStream create(final Path f, final FsPermission permission, - final EnumSet<CreateFlag> cflags, final int bufferSize, - final short replication, final long blockSize, final Progressable progress, - final ChecksumOpt checksumOpt) throws IOException { + final EnumSet<CreateFlag> cflags, final int bufferSize, + final short replication, final long blockSize, + final Progressable progress, final ChecksumOpt checksumOpt) + throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { @Override - public FSDataOutputStream doCall(final Path p) - throws IOException, UnresolvedLinkException { + public FSDataOutputStream doCall(final Path p) throws IOException { final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, - cflags, replication, blockSize, progress, bufferSize, - checksumOpt); + cflags, replication, blockSize, progress, bufferSize, + checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); } @Override @@ -469,14 +463,14 @@ public class DistributedFileSystem extends FileSystem { @Override protected HdfsDataOutputStream primitiveCreate(Path f, - FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize, - short replication, long blockSize, Progressable progress, - ChecksumOpt checksumOpt) throws IOException { + FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize, + short replication, long blockSize, Progressable progress, + ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); final DFSOutputStream dfsos = dfs.primitiveCreate( - getPathName(fixRelativePart(f)), - absolutePermission, flag, true, replication, blockSize, - progress, bufferSize, checksumOpt); + getPathName(fixRelativePart(f)), + absolutePermission, flag, true, replication, blockSize, + progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); } @@ -495,10 +489,9 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { @Override - public FSDataOutputStream doCall(final Path p) throws IOException, - UnresolvedLinkException { + public FSDataOutputStream doCall(final Path p) throws IOException { final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, - flag, false, replication, blockSize, progress, bufferSize, null); + flag, false, replication, blockSize, progress, bufferSize, null); return dfs.createWrappedOutputStream(dfsos, statistics); } @@ -512,15 +505,13 @@ public class DistributedFileSystem extends FileSystem { } @Override - public boolean setReplication(Path src, - final short replication - ) throws IOException { + public boolean setReplication(Path src, final short replication) + throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(src); return new FileSystemLinkResolver<Boolean>() { @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Boolean doCall(final Path p) throws IOException { return dfs.setReplication(getPathName(p), replication); } @Override @@ -544,8 +535,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(src); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.setStoragePolicy(getPathName(p), policyName); return null; } @@ -571,7 +561,7 @@ public class DistributedFileSystem extends FileSystem { @Override public BlockStoragePolicySpi next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + throws IOException { return fs.getStoragePolicy(p); } }.resolve(this, absF); @@ -585,7 +575,6 @@ public class DistributedFileSystem extends FileSystem { /** * Deprecated. Prefer {@link FileSystem#getAllStoragePolicies()} - * @return * @throws IOException */ @Deprecated @@ -597,7 +586,7 @@ public class DistributedFileSystem extends FileSystem { /** * Move blocks from srcs to trg and delete srcs afterwards. * The file block sizes must be the same. - * + * * @param trg existing file to append to * @param psrcs list of files (same block size, same replication) * @throws IOException @@ -644,7 +633,7 @@ public class DistributedFileSystem extends FileSystem { } } - + @SuppressWarnings("deprecation") @Override public boolean rename(Path src, Path dst) throws IOException { @@ -662,8 +651,7 @@ public class DistributedFileSystem extends FileSystem { // Keep trying to resolve the destination return new FileSystemLinkResolver<Boolean>() { @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Boolean doCall(final Path p) throws IOException { return dfs.rename(getPathName(source), getPathName(p)); } @Override @@ -676,7 +664,7 @@ public class DistributedFileSystem extends FileSystem { } } - /** + /** * This rename operation is guaranteed to be atomic. */ @SuppressWarnings("deprecation") @@ -695,8 +683,7 @@ public class DistributedFileSystem extends FileSystem { // Keep trying to resolve the destination new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.rename(getPathName(source), getPathName(p), options); return null; } @@ -716,8 +703,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<Boolean>() { @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Boolean doCall(final Path p) throws IOException { return dfs.truncate(getPathName(p), newLength); } @Override @@ -734,8 +720,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<Boolean>() { @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Boolean doCall(final Path p) throws IOException { return dfs.delete(getPathName(p), recursive); } @Override @@ -745,15 +730,14 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } - + @Override public ContentSummary getContentSummary(Path f) throws IOException { statistics.incrementReadOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<ContentSummary>() { @Override - public ContentSummary doCall(final Path p) - throws IOException, UnresolvedLinkException { + public ContentSummary doCall(final Path p) throws IOException { return dfs.getContentSummary(getPathName(p)); } @Override @@ -765,15 +749,15 @@ public class DistributedFileSystem extends FileSystem { } /** Set a directory's quotas - * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long, StorageType) + * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, + * long, long, StorageType) */ public void setQuota(Path src, final long namespaceQuota, final long storagespaceQuota) throws IOException { Path absF = fixRelativePart(src); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.setQuota(getPathName(p), namespaceQuota, storagespaceQuota); return null; } @@ -795,22 +779,21 @@ public class DistributedFileSystem extends FileSystem { * @param quota value of the specific storage type quota to be modified. * Maybe {@link HdfsConstants#QUOTA_RESET} to clear quota by storage type. */ - public void setQuotaByStorageType( - Path src, final StorageType type, final long quota) - throws IOException { + public void setQuotaByStorageType(Path src, final StorageType type, + final long quota) + throws IOException { Path absF = fixRelativePart(src); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.setQuotaByStorageType(getPathName(p), type, quota); return null; } @Override public Void next(final FileSystem fs, final Path p) - throws IOException { - // setQuotaByStorageType is not defined in FileSystem, so we only can resolve - // within this DFS + throws IOException { + // setQuotaByStorageType is not defined in FileSystem, so we only can + // resolve within this DFS return doCall(p); } }.resolve(this, absF); @@ -826,7 +809,7 @@ public class DistributedFileSystem extends FileSystem { if (thisListing == null) { // the directory does not exist throw new FileNotFoundException("File " + p + " does not exist."); } - + HdfsFileStatus[] partialListing = thisListing.getPartialListing(); if (!thisListing.hasMore()) { // got all entries of the directory FileStatus[] stats = new FileStatus[partialListing.length]; @@ -840,30 +823,30 @@ public class DistributedFileSystem extends FileSystem { // The directory size is too big that it needs to fetch more // estimate the total number of entries in the directory int totalNumEntries = - partialListing.length + thisListing.getRemainingEntries(); + partialListing.length + thisListing.getRemainingEntries(); ArrayList<FileStatus> listing = - new ArrayList<FileStatus>(totalNumEntries); + new ArrayList<>(totalNumEntries); // add the first batch of entries to the array list for (HdfsFileStatus fileStatus : partialListing) { listing.add(fileStatus.makeQualified(getUri(), p)); } statistics.incrementLargeReadOps(1); - + // now fetch more entries do { thisListing = dfs.listPaths(src, thisListing.getLastName()); - + if (thisListing == null) { // the directory is deleted throw new FileNotFoundException("File " + p + " does not exist."); } - + partialListing = thisListing.getPartialListing(); for (HdfsFileStatus fileStatus : partialListing) { listing.add(fileStatus.makeQualified(getUri(), p)); } statistics.incrementLargeReadOps(1); } while (thisListing.hasMore()); - + return listing.toArray(new FileStatus[listing.size()]); } @@ -880,8 +863,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(p); return new FileSystemLinkResolver<FileStatus[]>() { @Override - public FileStatus[] doCall(final Path p) - throws IOException, UnresolvedLinkException { + public FileStatus[] doCall(final Path p) throws IOException { return listStatusInternal(p); } @Override @@ -895,18 +877,18 @@ public class DistributedFileSystem extends FileSystem { @Override protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p, final PathFilter filter) - throws IOException { + throws IOException { Path absF = fixRelativePart(p); return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() { @Override public RemoteIterator<LocatedFileStatus> doCall(final Path p) - throws IOException, UnresolvedLinkException { - return new DirListingIterator<LocatedFileStatus>(p, filter, true); + throws IOException { + return new DirListingIterator<>(p, filter, true); } @Override - public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p) - throws IOException { + public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, + final Path p) throws IOException { if (fs instanceof DistributedFileSystem) { return ((DistributedFileSystem)fs).listLocatedStatus(p, filter); } @@ -929,19 +911,19 @@ public class DistributedFileSystem extends FileSystem { */ @Override public RemoteIterator<FileStatus> listStatusIterator(final Path p) - throws IOException { + throws IOException { Path absF = fixRelativePart(p); return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() { @Override public RemoteIterator<FileStatus> doCall(final Path p) - throws IOException, UnresolvedLinkException { - return new DirListingIterator<FileStatus>(p, false); + throws IOException { + return new DirListingIterator<>(p, false); } @Override public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p) throws IOException { - return ((DistributedFileSystem)fs).listStatusIterator(p); + return ((DistributedFileSystem)fs).listStatusIterator(p); } }.resolve(this, absF); @@ -950,14 +932,14 @@ public class DistributedFileSystem extends FileSystem { /** * This class defines an iterator that returns * the file status of each file/subdirectory of a directory - * + * * if needLocation, status contains block location if it is a file * throws a RuntimeException with the error as its cause. - * + * * @param <T> the type of the file status */ private class DirListingIterator<T extends FileStatus> - implements RemoteIterator<T> { + implements RemoteIterator<T> { private DirectoryListing thisListing; private int i; private Path p; @@ -999,21 +981,21 @@ public class DistributedFileSystem extends FileSystem { } else { next = (T)fileStat.makeQualified(getUri(), p); } - // apply filter if not null + // apply filter if not null if (filter == null || filter.accept(next.getPath())) { curStat = next; } } return curStat != null; } - + /** Check if there is a next item before applying the given filter */ private boolean hasNextNoFilter() throws IOException { if (thisListing == null) { return false; } if (i >= thisListing.getPartialListing().length - && thisListing.hasMore()) { + && thisListing.hasMore()) { // current listing is exhausted & fetch a new listing thisListing = dfs.listPaths(src, thisListing.getLastName(), needLocation); @@ -1032,11 +1014,11 @@ public class DistributedFileSystem extends FileSystem { T tmp = curStat; curStat = null; return tmp; - } + } throw new java.util.NoSuchElementException("No more entry in " + p); } } - + /** * Create a directory, only when the parent directories exist. * @@ -1044,7 +1026,7 @@ public class DistributedFileSystem extends FileSystem { * the permission is applied. * * @param f The path to create - * @param permission The permission. See FsPermission#applyUMask for + * @param permission The permission. See FsPermission#applyUMask for * details about how this is used to calculate the * effective permission. */ @@ -1059,7 +1041,7 @@ public class DistributedFileSystem extends FileSystem { * the permission is applied. * * @param f The path to create - * @param permission The permission. See FsPermission#applyUMask for + * @param permission The permission. See FsPermission#applyUMask for * details about how this is used to calculate the * effective permission. */ @@ -1074,8 +1056,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<Boolean>() { @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Boolean doCall(final Path p) throws IOException { return dfs.mkdirs(getPathName(p), permission, createParent); } @@ -1096,12 +1077,12 @@ public class DistributedFileSystem extends FileSystem { @SuppressWarnings("deprecation") @Override protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) - throws IOException { + throws IOException { statistics.incrementWriteOps(1); return dfs.primitiveMkdir(getPathName(f), absolutePermission); } - + @Override public void close() throws IOException { try { @@ -1176,7 +1157,7 @@ public class DistributedFileSystem extends FileSystem { /** * Returns count of blocks with no good replicas left. Normally should be * zero. - * + * * @throws IOException */ public long getMissingBlocksCount() throws IOException { @@ -1195,7 +1176,7 @@ public class DistributedFileSystem extends FileSystem { /** * Returns count of blocks with one of more replica missing. - * + * * @throws IOException */ public long getUnderReplicatedBlocksCount() throws IOException { @@ -1204,7 +1185,7 @@ public class DistributedFileSystem extends FileSystem { /** * Returns count of blocks with at least one replica marked corrupt. - * + * * @throws IOException */ public long getCorruptBlocksCount() throws IOException { @@ -1213,7 +1194,7 @@ public class DistributedFileSystem extends FileSystem { @Override public RemoteIterator<Path> listCorruptFileBlocks(Path path) - throws IOException { + throws IOException { return new CorruptFileBlockIterator(dfs, path); } @@ -1223,25 +1204,25 @@ public class DistributedFileSystem extends FileSystem { } /** @return datanode statistics for the given type. */ - public DatanodeInfo[] getDataNodeStats(final DatanodeReportType type - ) throws IOException { + public DatanodeInfo[] getDataNodeStats(final DatanodeReportType type) + throws IOException { return dfs.datanodeReport(type); } /** * Enter, leave or get safe mode. - * + * * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode( * HdfsConstants.SafeModeAction,boolean) */ - public boolean setSafeMode(HdfsConstants.SafeModeAction action) - throws IOException { + public boolean setSafeMode(HdfsConstants.SafeModeAction action) + throws IOException { return setSafeMode(action, false); } /** * Enter, leave or get safe mode. - * + * * @param action * One of SafeModeAction.ENTER, SafeModeAction.LEAVE and * SafeModeAction.GET @@ -1260,7 +1241,7 @@ public class DistributedFileSystem extends FileSystem { * * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#saveNamespace() */ - public void saveNamespace() throws AccessControlException, IOException { + public void saveNamespace() throws IOException { dfs.saveNamespace(); } @@ -1270,24 +1251,23 @@ public class DistributedFileSystem extends FileSystem { * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits() * @return the transaction ID of the newly created segment */ - public long rollEdits() throws AccessControlException, IOException { + public long rollEdits() throws IOException { return dfs.rollEdits(); } /** * enable/disable/check restoreFaileStorage - * + * * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String arg) */ - public boolean restoreFailedStorage(String arg) - throws AccessControlException, IOException { + public boolean restoreFailedStorage(String arg) throws IOException { return dfs.restoreFailedStorage(arg); } - + /** - * Refreshes the list of hosts and excluded hosts from the configured - * files. + * Refreshes the list of hosts and excluded hosts from the configured + * files. */ public void refreshNodes() throws IOException { dfs.refreshNodes(); @@ -1310,7 +1290,7 @@ public class DistributedFileSystem extends FileSystem { } /* - * Requests the namenode to dump data strcutures into specified + * Requests the namenode to dump data strcutures into specified * file. */ public void metaSave(String pathname) throws IOException { @@ -1332,8 +1312,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FileStatus>() { @Override - public FileStatus doCall(final Path p) throws IOException, - UnresolvedLinkException { + public FileStatus doCall(final Path p) throws IOException { HdfsFileStatus fi = dfs.getFileInfo(getPathName(p)); if (fi != null) { return fi.makeQualified(getUri(), p); @@ -1352,10 +1331,7 @@ public class DistributedFileSystem extends FileSystem { @SuppressWarnings("deprecation") @Override public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, - IOException { + final boolean createParent) throws IOException { if (!FileSystem.areSymlinksEnabled()) { throw new UnsupportedOperationException("Symlinks not supported"); } @@ -1363,14 +1339,12 @@ public class DistributedFileSystem extends FileSystem { final Path absF = fixRelativePart(link); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) throws IOException, - UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.createSymlink(target.toString(), getPathName(p), createParent); return null; } @Override - public Void next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + public Void next(final FileSystem fs, final Path p) throws IOException { fs.createSymlink(target, p, createParent); return null; } @@ -1383,15 +1357,12 @@ public class DistributedFileSystem extends FileSystem { } @Override - public FileStatus getFileLinkStatus(final Path f) - throws AccessControlException, FileNotFoundException, - UnsupportedFileSystemException, IOException { + public FileStatus getFileLinkStatus(final Path f) throws IOException { statistics.incrementReadOps(1); final Path absF = fixRelativePart(f); FileStatus status = new FileSystemLinkResolver<FileStatus>() { @Override - public FileStatus doCall(final Path p) throws IOException, - UnresolvedLinkException { + public FileStatus doCall(final Path p) throws IOException { HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p)); if (fi != null) { return fi.makeQualified(getUri(), p); @@ -1401,7 +1372,7 @@ public class DistributedFileSystem extends FileSystem { } @Override public FileStatus next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + throws IOException { return fs.getFileLinkStatus(p); } }.resolve(this, absF); @@ -1415,14 +1386,12 @@ public class DistributedFileSystem extends FileSystem { } @Override - public Path getLinkTarget(final Path f) throws AccessControlException, - FileNotFoundException, UnsupportedFileSystemException, IOException { + public Path getLinkTarget(final Path f) throws IOException { statistics.incrementReadOps(1); final Path absF = fixRelativePart(f); return new FileSystemLinkResolver<Path>() { @Override - public Path doCall(final Path p) throws IOException, - UnresolvedLinkException { + public Path doCall(final Path p) throws IOException { HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p)); if (fi != null) { return fi.makeQualified(getUri(), p).getSymlink(); @@ -1431,8 +1400,7 @@ public class DistributedFileSystem extends FileSystem { } } @Override - public Path next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + public Path next(final FileSystem fs, final Path p) throws IOException { return fs.getLinkTarget(p); } }.resolve(this, absF); @@ -1454,8 +1422,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FileChecksum>() { @Override - public FileChecksum doCall(final Path p) - throws IOException, UnresolvedLinkException { + public FileChecksum doCall(final Path p) throws IOException { return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE); } @@ -1474,8 +1441,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FileChecksum>() { @Override - public FileChecksum doCall(final Path p) - throws IOException, UnresolvedLinkException { + public FileChecksum doCall(final Path p) throws IOException { return dfs.getFileChecksum(getPathName(p), length); } @@ -1483,11 +1449,11 @@ public class DistributedFileSystem extends FileSystem { public FileChecksum next(final FileSystem fs, final Path p) throws IOException { if (fs instanceof DistributedFileSystem) { - return ((DistributedFileSystem) fs).getFileChecksum(p, length); + return fs.getFileChecksum(p, length); } else { throw new UnsupportedFileSystemException( "getFileChecksum(Path, long) is not supported by " - + fs.getClass().getSimpleName()); + + fs.getClass().getSimpleName()); } } }.resolve(this, absF); @@ -1495,13 +1461,12 @@ public class DistributedFileSystem extends FileSystem { @Override public void setPermission(Path p, final FsPermission permission - ) throws IOException { + ) throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(p); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.setPermission(getPathName(p), permission); return null; } @@ -1516,8 +1481,8 @@ public class DistributedFileSystem extends FileSystem { } @Override - public void setOwner(Path p, final String username, final String groupname - ) throws IOException { + public void setOwner(Path p, final String username, final String groupname) + throws IOException { if (username == null && groupname == null) { throw new IOException("username == null && groupname == null"); } @@ -1525,8 +1490,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(p); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.setOwner(getPathName(p), username, groupname); return null; } @@ -1541,14 +1505,13 @@ public class DistributedFileSystem extends FileSystem { } @Override - public void setTimes(Path p, final long mtime, final long atime - ) throws IOException { + public void setTimes(Path p, final long mtime, final long atime) + throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(p); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.setTimes(getPathName(p), mtime, atime); return null; } @@ -1561,7 +1524,7 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } - + @Override protected int getDefaultPort() { @@ -1571,9 +1534,7 @@ public class DistributedFileSystem extends FileSystem { @Override public Token<DelegationTokenIdentifier> getDelegationToken(String renewer) throws IOException { - Token<DelegationTokenIdentifier> result = - dfs.getDelegationToken(renewer == null ? null : new Text(renewer)); - return result; + return dfs.getDelegationToken(renewer == null ? null : new Text(renewer)); } /** @@ -1598,7 +1559,7 @@ public class DistributedFileSystem extends FileSystem { public String getCanonicalServiceName() { return dfs.getCanonicalServiceName(); } - + @Override protected URI canonicalizeUri(URI uri) { if (HAUtilClient.isLogicalUri(getConf(), uri)) { @@ -1613,7 +1574,7 @@ public class DistributedFileSystem extends FileSystem { /** * Utility function that returns if the NameNode is in safemode or not. In HA * mode, this API will return only ActiveNN's safemode status. - * + * * @return true if NameNode is in safemode, false otherwise. * @throws IOException * when there is an issue communicating with the NameNode @@ -1627,8 +1588,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(path); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.allowSnapshot(getPathName(p)); return null; } @@ -1648,14 +1608,13 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } - + /** @see HdfsAdmin#disallowSnapshot(Path) */ public void disallowSnapshot(final Path path) throws IOException { Path absF = fixRelativePart(path); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.disallowSnapshot(getPathName(p)); return null; } @@ -1675,15 +1634,14 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } - + @Override - public Path createSnapshot(final Path path, final String snapshotName) + public Path createSnapshot(final Path path, final String snapshotName) throws IOException { Path absF = fixRelativePart(path); return new FileSystemLinkResolver<Path>() { @Override - public Path doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Path doCall(final Path p) throws IOException { return new Path(dfs.createSnapshot(getPathName(p), snapshotName)); } @@ -1701,15 +1659,14 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } - + @Override public void renameSnapshot(final Path path, final String snapshotOldName, final String snapshotNewName) throws IOException { Path absF = fixRelativePart(path); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.renameSnapshot(getPathName(p), snapshotOldName, snapshotNewName); return null; } @@ -1729,7 +1686,7 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } - + /** * @return All the snapshottable directories * @throws IOException @@ -1738,15 +1695,14 @@ public class DistributedFileSystem extends FileSystem { throws IOException { return dfs.getSnapshottableDirListing(); } - + @Override public void deleteSnapshot(final Path snapshotDir, final String snapshotName) throws IOException { Path absF = fixRelativePart(snapshotDir); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.deleteSnapshot(getPathName(p), snapshotName); return null; } @@ -1770,7 +1726,7 @@ public class DistributedFileSystem extends FileSystem { /** * Get the difference between two snapshots, or between a snapshot and the * current tree of a directory. - * + * * @see DFSClient#getSnapshotDiffReport(String, String, String) */ public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir, @@ -1778,8 +1734,7 @@ public class DistributedFileSystem extends FileSystem { Path absF = fixRelativePart(snapshotDir); return new FileSystemLinkResolver<SnapshotDiffReport>() { @Override - public SnapshotDiffReport doCall(final Path p) - throws IOException, UnresolvedLinkException { + public SnapshotDiffReport doCall(final Path p) throws IOException { return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot, toSnapshot); } @@ -1799,21 +1754,20 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } - + /** * Get the close status of a file * @param src The path to the file * * @return return true if file is closed * @throws FileNotFoundException if the file does not exist. - * @throws IOException If an I/O error occurred + * @throws IOException If an I/O error occurred */ public boolean isFileClosed(final Path src) throws IOException { Path absF = fixRelativePart(src); return new FileSystemLinkResolver<Boolean>() { @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { + public Boolean doCall(final Path p) throws IOException { return dfs.isFileClosed(getPathName(p)); } @@ -1841,7 +1795,7 @@ public class DistributedFileSystem extends FileSystem { /** * Add a new CacheDirective. - * + * * @param info Information about a directive to add. * @param flags {@link CacheFlag}s to use for this operation. * @return the ID of the directive that was created. @@ -1868,7 +1822,7 @@ public class DistributedFileSystem extends FileSystem { /** * Modify a CacheDirective. - * + * * @param info Information about the directive to modify. You must set the ID * to indicate which CacheDirective you want to modify. * @param flags {@link CacheFlag}s to use for this operation. @@ -1886,7 +1840,7 @@ public class DistributedFileSystem extends FileSystem { /** * Remove a CacheDirectiveInfo. - * + * * @param id identifier of the CacheDirectiveInfo to remove * @throws IOException if the directive could not be removed */ @@ -1894,10 +1848,10 @@ public class DistributedFileSystem extends FileSystem { throws IOException { dfs.removeCacheDirective(id); } - + /** * List cache directives. Incrementally fetches results from the server. - * + * * @param filter Filter parameters to use when listing the directives, null to * list all directives visible to us. * @return A RemoteIterator which returns CacheDirectiveInfo objects. @@ -1940,7 +1894,7 @@ public class DistributedFileSystem extends FileSystem { * * @param info * The request to add a cache pool. - * @throws IOException + * @throws IOException * If the request could not be completed. */ public void addCachePool(CachePoolInfo info) throws IOException { @@ -1953,20 +1907,20 @@ public class DistributedFileSystem extends FileSystem { * * @param info * The request to modify a cache pool. - * @throws IOException + * @throws IOException * If the request could not be completed. */ public void modifyCachePool(CachePoolInfo info) throws IOException { CachePoolInfo.validate(info); dfs.modifyCachePool(info); } - + /** * Remove a cache pool. * * @param poolName * Name of the cache pool to remove. - * @throws IOException + * @throws IOException * if the cache pool did not exist, or could not be removed. */ public void removeCachePool(String poolName) throws IOException { @@ -2043,8 +1997,7 @@ public class DistributedFileSystem extends FileSystem { return null; } @Override - public Void next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + public Void next(final FileSystem fs, final Path p) throws IOException { fs.removeDefaultAcl(p); return null; } @@ -2064,8 +2017,7 @@ public class DistributedFileSystem extends FileSystem { return null; } @Override - public Void next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + public Void next(final FileSystem fs, final Path p) throws IOException { fs.removeAcl(p); return null; } @@ -2076,7 +2028,8 @@ public class DistributedFileSystem extends FileSystem { * {@inheritDoc} */ @Override - public void setAcl(Path path, final List<AclEntry> aclSpec) throws IOException { + public void setAcl(Path path, final List<AclEntry> aclSpec) + throws IOException { Path absF = fixRelativePart(path); new FileSystemLinkResolver<Void>() { @Override @@ -2106,20 +2059,19 @@ public class DistributedFileSystem extends FileSystem { } @Override public AclStatus next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + throws IOException { return fs.getAclStatus(p); } }.resolve(this, absF); } - + /* HDFS only */ public void createEncryptionZone(final Path path, final String keyName) - throws IOException { + throws IOException { Path absF = fixRelativePart(path); new FileSystemLinkResolver<Void>() { @Override - public Void doCall(final Path p) throws IOException, - UnresolvedLinkException { + public Void doCall(final Path p) throws IOException { dfs.createEncryptionZone(getPathName(p), keyName); return null; } @@ -2142,13 +2094,12 @@ public class DistributedFileSystem extends FileSystem { /* HDFS only */ public EncryptionZone getEZForPath(final Path path) - throws IOException { + throws IOException { Preconditions.checkNotNull(path); Path absF = fixRelativePart(path); return new FileSystemLinkResolver<EncryptionZone>() { @Override - public EncryptionZone doCall(final Path p) throws IOException, - UnresolvedLinkException { + public EncryptionZone doCall(final Path p) throws IOException { return dfs.getEZForPath(getPathName(p)); } @@ -2175,7 +2126,7 @@ public class DistributedFileSystem extends FileSystem { } @Override - public void setXAttr(Path path, final String name, final byte[] value, + public void setXAttr(Path path, final String name, final byte[] value, final EnumSet<XAttrSetFlag> flag) throws IOException { Path absF = fixRelativePart(path); new FileSystemLinkResolver<Void>() { @@ -2190,10 +2141,10 @@ public class DistributedFileSystem extends FileSystem { public Void next(final FileSystem fs, final Path p) throws IOException { fs.setXAttr(p, name, value, flag); return null; - } + } }.resolve(this, absF); } - + @Override public byte[] getXAttr(Path path, final String name) throws IOException { final Path absF = fixRelativePart(path); @@ -2203,13 +2154,12 @@ public class DistributedFileSystem extends FileSystem { return dfs.getXAttr(getPathName(p), name); } @Override - public byte[] next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + public byte[] next(final FileSystem fs, final Path p) throws IOException { return fs.getXAttr(p, name); } }.resolve(this, absF); } - + @Override public Map<String, byte[]> getXAttrs(Path path) throws IOException { final Path absF = fixRelativePart(path); @@ -2220,14 +2170,14 @@ public class DistributedFileSystem extends FileSystem { } @Override public Map<String, byte[]> next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + throws IOException { return fs.getXAttrs(p); } }.resolve(this, absF); } - + @Override - public Map<String, byte[]> getXAttrs(Path path, final List<String> names) + public Map<String, byte[]> getXAttrs(Path path, final List<String> names) throws IOException { final Path absF = fixRelativePart(path); return new FileSystemLinkResolver<Map<String, byte[]>>() { @@ -2237,15 +2187,15 @@ public class DistributedFileSystem extends FileSystem { } @Override public Map<String, byte[]> next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + throws IOException { return fs.getXAttrs(p, names); } }.resolve(this, absF); } - + @Override public List<String> listXAttrs(Path path) - throws IOException { + throws IOException { final Path absF = fixRelativePart(path); return new FileSystemLinkResolver<List<String>>() { @Override @@ -2254,7 +2204,7 @@ public class DistributedFileSystem extends FileSystem { } @Override public List<String> next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { + throws IOException { return fs.listXAttrs(p); } }.resolve(this, absF);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java index 7b9e8e3..fe39df6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java @@ -40,7 +40,7 @@ final public class ExtendedBlockId { public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) { return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); } - + public ExtendedBlockId(long blockId, String bpId) { this.blockId = blockId; this.bpId = bpId; @@ -76,7 +76,6 @@ final public class ExtendedBlockId { @Override public String toString() { - return new StringBuilder().append(blockId). - append("_").append(bpId).toString(); + return String.valueOf(blockId) + "_" + bpId; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java index 2eb9d52..fae2cc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java @@ -79,7 +79,7 @@ public final class ExternalBlockReader implements BlockReader { } @Override - public int available() throws IOException { + public int available() { // We return the amount of bytes between the current offset and the visible // length. Some of the other block readers return a shorter length than // that. The only advantage to returning a shorter length is that the http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java index 05492e0..0b0e006 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.Callable; @@ -37,10 +36,13 @@ import com.google.common.cache.RemovalNotification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + @InterfaceAudience.Private public class KeyProviderCache { - public static final Logger LOG = LoggerFactory.getLogger(KeyProviderCache.class); + public static final Logger LOG = LoggerFactory.getLogger( + KeyProviderCache.class); private final Cache<URI, KeyProvider> cache; @@ -50,14 +52,14 @@ public class KeyProviderCache { .removalListener(new RemovalListener<URI, KeyProvider>() { @Override public void onRemoval( - RemovalNotification<URI, KeyProvider> notification) { + @Nonnull RemovalNotification<URI, KeyProvider> notification) { try { + assert notification.getValue() != null; notification.getValue().close(); } catch (Throwable e) { LOG.error( "Error closing KeyProvider with uri [" + notification.getKey() + "]", e); - ; } } }) @@ -83,8 +85,8 @@ public class KeyProviderCache { } private URI createKeyProviderURI(Configuration conf) { - final String providerUriStr = - conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, ""); + final String providerUriStr = conf.getTrimmed( + HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, ""); // No provider set in conf if (providerUriStr.isEmpty()) { LOG.error("Could not find uri with key [" @@ -104,9 +106,9 @@ public class KeyProviderCache { } @VisibleForTesting - public void setKeyProvider(Configuration conf, KeyProvider keyProvider) - throws IOException { + public void setKeyProvider(Configuration conf, KeyProvider keyProvider) { URI uri = createKeyProviderURI(conf); + assert uri != null; cache.put(uri, keyProvider); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 223c40d..39b188f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -297,13 +297,11 @@ public class NameNodeProxiesClient { * @param failoverProxyProvider Failover proxy provider * @return an object containing both the proxy and the associated * delegation token service it corresponds to - * @throws IOException */ @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createHAProxy( Configuration conf, URI nameNodeUri, Class<T> xface, - AbstractNNFailoverProxyProvider<T> failoverProxyProvider) - throws IOException { + AbstractNNFailoverProxyProvider<T> failoverProxyProvider) { Preconditions.checkNotNull(failoverProxyProvider); // HA case DfsClientConf config = new DfsClientConf(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java index 55aa741..8e34d40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java @@ -32,7 +32,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.util.IOUtilsClient; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -46,16 +45,16 @@ import org.slf4j.LoggerFactory; @VisibleForTesting public class PeerCache { private static final Logger LOG = LoggerFactory.getLogger(PeerCache.class); - + private static class Key { final DatanodeID dnID; final boolean isDomain; - + Key(DatanodeID dnID, boolean isDomain) { this.dnID = dnID; this.isDomain = isDomain; } - + @Override public boolean equals(Object o) { if (!(o instanceof Key)) { @@ -70,7 +69,7 @@ public class PeerCache { return dnID.hashCode() ^ (isDomain ? 1 : 0); } } - + private static class Value { private final Peer peer; private final long time; @@ -92,10 +91,10 @@ public class PeerCache { private Daemon daemon; /** A map for per user per datanode. */ private final LinkedListMultimap<Key, Value> multimap = - LinkedListMultimap.create(); + LinkedListMultimap.create(); private final int capacity; private final long expiryPeriod; - + public PeerCache(int c, long e) { this.capacity = c; this.expiryPeriod = e; @@ -107,17 +106,17 @@ public class PeerCache { expiryPeriod + " when cache is enabled."); } } - + private boolean isDaemonStarted() { - return (daemon == null)? false: true; + return daemon != null; } private synchronized void startExpiryDaemon() { // start daemon only if not already started - if (isDaemonStarted() == true) { + if (isDaemonStarted()) { return; } - + daemon = new Daemon(new Runnable() { @Override public void run() { @@ -144,7 +143,7 @@ public class PeerCache { * @param isDomain Whether to retrieve a DomainPeer or not. * * @return An open Peer connected to the DN, or null if none - * was found. + * was found. */ public Peer get(DatanodeID dnId, boolean isDomain) { @@ -215,12 +214,11 @@ public class PeerCache { private synchronized void evictExpired(long expiryPeriod) { while (multimap.size() != 0) { Iterator<Entry<Key, Value>> iter = - multimap.entries().iterator(); + multimap.entries().iterator(); Entry<Key, Value> entry = iter.next(); // if oldest socket expired, remove it - if (entry == null || - Time.monotonicNow() - entry.getValue().getTime() < - expiryPeriod) { + if (entry == null || + Time.monotonicNow() - entry.getValue().getTime() < expiryPeriod) { break; } IOUtilsClient.cleanup(LOG, entry.getValue().getPeer()); @@ -235,8 +233,7 @@ public class PeerCache { // We can get the oldest element immediately, because of an interesting // property of LinkedListMultimap: its iterator traverses entries in the // order that they were added. - Iterator<Entry<Key, Value>> iter = - multimap.entries().iterator(); + Iterator<Entry<Key, Value>> iter = multimap.entries().iterator(); if (!iter.hasNext()) { throw new IllegalStateException("Cannot evict from empty cache! " + "capacity: " + capacity); @@ -247,8 +244,8 @@ public class PeerCache { } /** - * Periodically check in the cache and expire the entries - * older than expiryPeriod minutes + * Periodically check in the cache and expire the entries older than + * expiryPeriod minutes. */ private void run() throws InterruptedException { for(long lastExpiryTime = Time.monotonicNow(); @@ -274,7 +271,7 @@ public class PeerCache { } multimap.clear(); } - + @VisibleForTesting void close() { clear(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 017be9f..028c964 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -55,7 +55,8 @@ import org.slf4j.LoggerFactory; /** * @deprecated this is an old implementation that is being left around - * in case any issues spring up with the new {@link RemoteBlockReader2} implementation. + * in case any issues spring up with the new {@link RemoteBlockReader2} + * implementation. * It will be removed in the next release. */ @InterfaceAudience.Private @@ -79,7 +80,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { private final long blockId; /** offset in block of of first chunk - may be less than startOffset - if startOffset is not chunk-aligned */ + if startOffset is not chunk-aligned */ private final long firstChunkOffset; private final int bytesPerChecksum; @@ -91,7 +92,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { * at the beginning so that the read can begin on a chunk boundary. */ private final long bytesNeededToFinish; - + /** * True if we are reading from a local DataNode. */ @@ -99,17 +100,17 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { private boolean eos = false; private boolean sentStatusCode = false; - + ByteBuffer checksumBytes = null; /** Amount of unread data in the current received packet */ int dataLeft = 0; - + private final PeerCache peerCache; private final Tracer tracer; - + /* FSInputChecker interface */ - + /* same interface as inputStream java.io.InputStream#read() * used by DFSInputStream#read() * This violates one rule when there is a checksum error: @@ -118,9 +119,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { * the checksum. */ @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + // This has to be set here, *before* the skip, since we can // hit EOS during the skip, in the case that our entire read // is smaller than the checksum chunk. @@ -135,7 +136,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { throw new IOException("Could not skip required number of bytes"); } } - + int nRead = super.read(buf, off, len); // if eos was set in the previous read, send a status code to the DN @@ -152,7 +153,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { @Override public synchronized long skip(long n) throws IOException { /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ + * in majority of the cases?. This one throws. */ long nSkipped = 0; while (nSkipped < n) { int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE); @@ -168,18 +169,18 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { @Override public int read() throws IOException { throw new IOException("read() is not expected to be invoked. " + - "Use read(buf, off, len) instead."); + "Use read(buf, off, len) instead."); } - + @Override public boolean seekToNewSource(long targetPos) throws IOException { - /* Checksum errors are handled outside the BlockReader. - * DFSInputStream does not always call 'seekToNewSource'. In the + /* Checksum errors are handled outside the BlockReader. + * DFSInputStream does not always call 'seekToNewSource'. In the * case of pread(), it just tries a different replica without seeking. - */ + */ return false; } - + @Override public void seek(long pos) throws IOException { throw new IOException("Seek() is not supported in BlockInputChecker"); @@ -188,17 +189,17 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { @Override protected long getChunkPosition(long pos) { throw new RuntimeException("getChunkPosition() is not supported, " + - "since seek is not required"); + "since seek is not required"); } - + /** - * Makes sure that checksumBytes has enough capacity - * and limit is set to the number of checksum bytes needed + * Makes sure that checksumBytes has enough capacity + * and limit is set to the number of checksum bytes needed * to be read. */ private void adjustChecksumBytes(int dataLen) { - int requiredSize = - ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; + int requiredSize = + ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize; if (checksumBytes == null || requiredSize > checksumBytes.capacity()) { checksumBytes = ByteBuffer.wrap(new byte[requiredSize]); } else { @@ -206,42 +207,39 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { } checksumBytes.limit(requiredSize); } - + @Override - protected synchronized int readChunk(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { - TraceScope scope = tracer. - newScope("RemoteBlockReader#readChunk(" + blockId + ")"); - try { + protected synchronized int readChunk(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { + try (TraceScope ignored = tracer.newScope( + "RemoteBlockReader#readChunk(" + blockId + ")")) { return readChunkImpl(pos, buf, offset, len, checksumBuf); - } finally { - scope.close(); } } private synchronized int readChunkImpl(long pos, byte[] buf, int offset, - int len, byte[] checksumBuf) - throws IOException { + int len, byte[] checksumBuf) + throws IOException { // Read one chunk. if (eos) { // Already hit EOF return -1; } - + // Read one DATA_CHUNK. long chunkOffset = lastChunkOffset; if ( lastChunkLen > 0 ) { chunkOffset += lastChunkLen; } - + // pos is relative to the start of the first chunk of the read. // chunkOffset is relative to the start of the block. // This makes sure that the read passed from FSInputChecker is the // for the same chunk we expect to be reading from the DN. if ( (pos + firstChunkOffset) != chunkOffset ) { - throw new IOException("Mismatch in pos : " + pos + " + " + - firstChunkOffset + " != " + chunkOffset); + throw new IOException("Mismatch in pos : " + pos + " + " + + firstChunkOffset + " != " + chunkOffset); } // Read next packet if the previous packet has been read completely. @@ -254,8 +252,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { // Sanity check the lengths if (!header.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - header); + throw new IOException("BlockReader: error in packet header " + + header); } lastSeqNo = header.getSeqno(); @@ -263,7 +261,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { adjustChecksumBytes(header.getDataLen()); if (header.getDataLen() > 0) { IOUtils.readFully(in, checksumBytes.array(), 0, - checksumBytes.limit()); + checksumBytes.limit()); } } @@ -284,14 +282,14 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { // How many chunks we can fit in databuffer // - note this is a floor since we always read full chunks int chunksCanFit = Math.min(len / bytesPerChecksum, - checksumBuf.length / checksumSize); + checksumBuf.length / checksumSize); // How many chunks should we read checksumsToRead = Math.min(chunksLeft, chunksCanFit); // How many bytes should we actually read bytesToRead = Math.min( - checksumsToRead * bytesPerChecksum, // full chunks - dataLeft); // in case we have a partial + checksumsToRead * bytesPerChecksum, // full chunks + dataLeft); // in case we have a partial } else { // no checksum bytesToRead = Math.min(dataLeft, len); @@ -328,7 +326,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { if (!hdr.isLastPacketInBlock() || hdr.getDataLen() != 0) { throw new IOException("Expected empty end-of-read packet! Header: " + - hdr); + hdr); } eos = true; @@ -340,22 +338,22 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { return bytesToRead; } - + private RemoteBlockReader(String file, String bpid, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { // Path is used only for printing block and file information in debug super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + - ":" + bpid + ":of:"+ file)/*too non path-like?*/, - 1, verifyChecksum, - checksum.getChecksumSize() > 0? checksum : null, - checksum.getBytesPerChecksum(), - checksum.getChecksumSize()); + ":" + bpid + ":of:"+ file)/*too non path-like?*/, + 1, verifyChecksum, + checksum.getChecksumSize() > 0? checksum : null, + checksum.getBytesPerChecksum(), + checksum.getChecksumSize()); this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); - + this.peer = peer; this.datanodeID = datanodeID; this.in = in; @@ -394,46 +392,46 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { * @return New BlockReader instance, or null on error. */ public static RemoteBlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName, Peer peer, - DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy, - Tracer tracer) - throws IOException { + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName, Peer peer, + DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy, + Tracer tracer) + throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); - + // // Get bytes in block, set streams // DataInputStream in = new DataInputStream( new BufferedInputStream(peer.getInputStream(), bufferSize)); - + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(in)); RemoteBlockReader2.checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); + status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? - + // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); - + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); } return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), @@ -453,7 +451,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { // in will be closed when its Socket is closed. } - + @Override public void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException { @@ -479,7 +477,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); + peer.getRemoteAddressString() + ": " + e.getMessage()); } } @@ -487,9 +485,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { public int read(ByteBuffer buf) throws IOException { throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); } - + @Override - public int available() throws IOException { + public int available() { // An optimistic estimate of how much data is available // to us without doing network I/O. return RemoteBlockReader2.TCP_WINDOW_SIZE; @@ -499,7 +497,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { public boolean isLocal() { return isLocal; } - + @Override public boolean isShortCircuit() { return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index ca31e67..c15bd1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -131,21 +131,19 @@ public class RemoteBlockReader2 implements BlockReader { public Peer getPeer() { return peer; } - + @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { + public synchronized int read(byte[] buf, int off, int len) + throws IOException { UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null); LOG.trace("Starting read #{} file {} from datanode {}", randomId, filename, datanodeID.getHostName()); - if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - TraceScope scope = tracer.newScope( - "RemoteBlockReader2#readNextPacket(" + blockId + ")"); - try { + if (curDataSlice == null || + curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + try (TraceScope ignored = tracer.newScope( + "RemoteBlockReader2#readNextPacket(" + blockId + ")")) { readNextPacket(); - } finally { - scope.close(); } } @@ -155,23 +153,21 @@ public class RemoteBlockReader2 implements BlockReader { // we're at EOF now return -1; } - + int nRead = Math.min(curDataSlice.remaining(), len); curDataSlice.get(buf, off, nRead); - + return nRead; } @Override public synchronized int read(ByteBuffer buf) throws IOException { - if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - TraceScope scope = tracer.newScope( - "RemoteBlockReader2#readNextPacket(" + blockId + ")"); - try { + if (curDataSlice == null || + (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) { + try (TraceScope ignored = tracer.newScope( + "RemoteBlockReader2#readNextPacket(" + blockId + ")")) { readNextPacket(); - } finally { - scope.close(); } } if (curDataSlice.remaining() == 0) { @@ -195,23 +191,24 @@ public class RemoteBlockReader2 implements BlockReader { PacketHeader curHeader = packetReceiver.getHeader(); curDataSlice = packetReceiver.getDataSlice(); assert curDataSlice.capacity() == curHeader.getDataLen(); - + LOG.trace("DFSClient readNextPacket got header {}", curHeader); // Sanity check the lengths if (!curHeader.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - curHeader); + throw new IOException("BlockReader: error in packet header " + + curHeader); } - + if (curHeader.getDataLen() > 0) { int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; int checksumsLen = chunks * checksumSize; assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : - "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + - " checksumsLen=" + checksumsLen; - + "checksum slice capacity=" + + packetReceiver.getChecksumSlice().capacity() + + " checksumsLen=" + checksumsLen; + lastSeqNo = curHeader.getSeqno(); if (verifyChecksum && curDataSlice.remaining() > 0) { // N.B.: the checksum error offset reported here is actually @@ -223,8 +220,8 @@ public class RemoteBlockReader2 implements BlockReader { filename, curHeader.getOffsetInBlock()); } bytesNeededToFinish -= curHeader.getDataLen(); - } - + } + // First packet will include some data prior to the first byte // the user requested. Skip it. if (curHeader.getOffsetInBlock() < startOffset) { @@ -243,7 +240,7 @@ public class RemoteBlockReader2 implements BlockReader { } } } - + @Override public synchronized long skip(long n) throws IOException { /* How can we make sure we don't throw a ChecksumException, at least @@ -251,7 +248,8 @@ public class RemoteBlockReader2 implements BlockReader { long skipped = 0; while (skipped < n) { long needToSkip = n - skipped; - if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + if (curDataSlice == null || + curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { readNextPacket(); } if (curDataSlice.remaining() == 0) { @@ -273,13 +271,13 @@ public class RemoteBlockReader2 implements BlockReader { PacketHeader trailer = packetReceiver.getHeader(); if (!trailer.isLastPacketInBlock() || - trailer.getDataLen() != 0) { + trailer.getDataLen() != 0) { throw new IOException("Expected empty end-of-read packet! Header: " + - trailer); + trailer); } } - protected RemoteBlockReader2(String file, String bpid, long blockId, + protected RemoteBlockReader2(String file, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { @@ -320,7 +318,7 @@ public class RemoteBlockReader2 implements BlockReader { // in will be closed when its Socket is closed. } - + /** * When the reader reaches end of the read, it sends a status response * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN @@ -335,7 +333,7 @@ public class RemoteBlockReader2 implements BlockReader { } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); + peer.getRemoteAddressString() + ": " + e.getMessage()); } } @@ -344,15 +342,15 @@ public class RemoteBlockReader2 implements BlockReader { */ static void writeReadResult(OutputStream out, Status statusCode) throws IOException { - + ClientReadStatusProto.newBuilder() - .setStatus(statusCode) - .build() - .writeDelimitedTo(out); + .setStatus(statusCode) + .build() + .writeDelimitedTo(out); out.flush(); } - + /** * File name to print when accessing a block directly (from servlets) * @param s Address of the block location @@ -374,7 +372,7 @@ public class RemoteBlockReader2 implements BlockReader { public void readFully(byte[] buf, int off, int len) throws IOException { BlockReaderUtil.readFully(this, buf, off, len); } - + /** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. @@ -391,18 +389,18 @@ public class RemoteBlockReader2 implements BlockReader { * @return New BlockReader instance, or null on error. */ public static BlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - boolean verifyChecksum, - String clientName, - Peer peer, DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy, - Tracer tracer) throws IOException { + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken, + long startOffset, long len, + boolean verifyChecksum, + String clientName, + Peer peer, DatanodeID datanodeID, + PeerCache peerCache, + CachingStrategy cachingStrategy, + Tracer tracer) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - peer.getOutputStream())); + peer.getOutputStream())); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy); @@ -415,7 +413,7 @@ public class RemoteBlockReader2 implements BlockReader { PBHelperClient.vintPrefixed(in)); checkSuccess(status, peer, block, file); ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); + status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? @@ -426,13 +424,13 @@ public class RemoteBlockReader2 implements BlockReader { if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); } - return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), - checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, - datanodeID, peerCache, tracer); + return new RemoteBlockReader2(file, block.getBlockId(), checksum, + verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, + peerCache, tracer); } static void checkSuccess( @@ -440,26 +438,26 @@ public class RemoteBlockReader2 implements BlockReader { ExtendedBlock block, String file) throws IOException { String logInfo = "for OP_READ_BLOCK" - + ", self=" + peer.getLocalAddressString() - + ", remote=" + peer.getRemoteAddressString() - + ", for file " + file - + ", for pool " + block.getBlockPoolId() - + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); + + ", self=" + peer.getLocalAddressString() + + ", remote=" + peer.getRemoteAddressString() + + ", for file " + file + + ", for pool " + block.getBlockPoolId() + + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); } - + @Override - public int available() throws IOException { + public int available() { // An optimistic estimate of how much data is available // to us without doing network I/O. return TCP_WINDOW_SIZE; } - + @Override public boolean isLocal() { return isLocal; } - + @Override public boolean isShortCircuit() { return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java index f03e179..d199b81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java @@ -34,7 +34,7 @@ public interface RemotePeerFactory { * @param datanodeId ID of destination DataNode * @return A new Peer connected to the address. * - * @throws IOException If there was an error connecting or creating + * @throws IOException If there was an error connecting or creating * the remote socket, encrypted stream, etc. */ Peer newConnectedPeer(InetSocketAddress addr,
