Author: jing9 Date: Thu May 22 18:50:20 2014 New Revision: 1596937 URL: http://svn.apache.org/r1596937 Log: MAPREDUCE-5899. Merge change r1596931 from trunk.
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java - copied unchanged from r1596931, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1596937&r1=1596936&r2=1596937&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Thu May 22 18:50:20 2014 @@ -115,7 +115,7 @@ public class Hdfs extends AbstractFileSy @Override public FileChecksum getFileChecksum(Path f) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getUriPath(f)); + return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE); } @Override Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1596937&r1=1596936&r2=1596937&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu May 22 18:50:20 2014 @@ -1817,15 +1817,19 @@ public class DFSClient implements java.i } /** - * Get the checksum of a file. + * Get the checksum of the whole file of a range of the file. Note that the + * range always starts from the beginning of the file. * @param src The file path + * @param length The length of the range * @return The checksum * @see DistributedFileSystem#getFileChecksum(Path) */ - public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { + public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) + throws IOException { checkOpen(); - return getFileChecksum(src, clientName, namenode, socketFactory, - dfsClientConf.socketTimeout, getDataEncryptionKey(), + Preconditions.checkArgument(length >= 0); + return getFileChecksum(src, length, clientName, namenode, + socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(), dfsClientConf.connectToDnViaHostname); } @@ -1866,8 +1870,9 @@ public class DFSClient implements java.i } /** - * Get the checksum of a file. + * Get the checksum of the whole file or a range of the file. * @param src The file path + * @param length the length of the range, i.e., the range is [0, length] * @param clientName the name of the client requesting the checksum. * @param namenode the RPC proxy for the namenode * @param socketFactory to create sockets to connect to DNs @@ -1877,12 +1882,13 @@ public class DFSClient implements java.i * @return The checksum */ private static MD5MD5CRC32FileChecksum getFileChecksum(String src, - String clientName, - ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, + long length, String clientName, ClientProtocol namenode, + SocketFactory socketFactory, int socketTimeout, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { - //get all block locations - LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); + //get block locations for the file range + LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, + length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1894,10 +1900,11 @@ public class DFSClient implements java.i boolean refetchBlocks = false; int lastRetriedIndex = -1; - //get block checksum for each block - for(int i = 0; i < locatedblocks.size(); i++) { + // get block checksum for each block + long remaining = length; + for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { if (refetchBlocks) { // refetch to get fresh tokens - blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); + blockLocations = callGetBlockLocations(namenode, src, 0, length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1906,6 +1913,10 @@ public class DFSClient implements java.i } LocatedBlock lb = locatedblocks.get(i); final ExtendedBlock block = lb.getBlock(); + if (remaining < block.getNumBytes()) { + block.setNumBytes(remaining); + } + remaining -= block.getNumBytes(); final DatanodeInfo[] datanodes = lb.getLocations(); //try each datanode location of the block Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1596937&r1=1596936&r2=1596937&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu May 22 18:50:20 2014 @@ -66,14 +66,12 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -83,7 +81,6 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -1188,7 +1185,7 @@ public class DistributedFileSystem exten @Override public FileChecksum doCall(final Path p) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getPathName(p)); + return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE); } @Override @@ -1200,6 +1197,32 @@ public class DistributedFileSystem exten } @Override + public FileChecksum getFileChecksum(Path f, final long length) + throws IOException { + statistics.incrementReadOps(1); + Path absF = fixRelativePart(f); + return new FileSystemLinkResolver<FileChecksum>() { + @Override + public FileChecksum doCall(final Path p) + throws IOException, UnresolvedLinkException { + return dfs.getFileChecksum(getPathName(p), length); + } + + @Override + public FileChecksum next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + return ((DistributedFileSystem) fs).getFileChecksum(p, length); + } else { + throw new UnsupportedFileSystemException( + "getFileChecksum(Path, long) is not supported by " + + fs.getClass().getSimpleName()); + } + } + }.resolve(this, absF); + } + + @Override public void setPermission(Path p, final FsPermission permission ) throws IOException { statistics.incrementWriteOps(1); Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1596937&r1=1596936&r2=1596937&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu May 22 18:50:20 2014 @@ -42,6 +42,7 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.nio.channels.ClosedChannelException; +import java.security.MessageDigest; import java.util.Arrays; import org.apache.commons.logging.Log; @@ -83,6 +84,7 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import com.google.common.base.Preconditions; import com.google.common.net.InetAddresses; import com.google.protobuf.ByteString; @@ -802,7 +804,44 @@ class DataXceiver extends Receiver imple IOUtils.closeStream(out); } } - + + private MD5Hash calcPartialBlockChecksum(ExtendedBlock block, + long requestLength, DataChecksum checksum, DataInputStream checksumIn) + throws IOException { + final int bytesPerCRC = checksum.getBytesPerChecksum(); + final int csize = checksum.getChecksumSize(); + final byte[] buffer = new byte[4*1024]; + MessageDigest digester = MD5Hash.getDigester(); + + long remaining = requestLength / bytesPerCRC * csize; + for (int toDigest = 0; remaining > 0; remaining -= toDigest) { + toDigest = checksumIn.read(buffer, 0, + (int) Math.min(remaining, buffer.length)); + if (toDigest < 0) { + break; + } + digester.update(buffer, 0, toDigest); + } + + int partialLength = (int) (requestLength % bytesPerCRC); + if (partialLength > 0) { + byte[] buf = new byte[partialLength]; + final InputStream blockIn = datanode.data.getBlockInputStream(block, + requestLength - partialLength); + try { + // Get the CRC of the partialLength. + IOUtils.readFully(blockIn, buf, 0, partialLength); + } finally { + IOUtils.closeStream(blockIn); + } + checksum.update(buf, 0, partialLength); + byte[] partialCrc = new byte[csize]; + checksum.writeValue(partialCrc, 0, true); + digester.update(partialCrc); + } + return new MD5Hash(digester.digest()); + } + @Override public void blockChecksum(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { @@ -810,25 +849,32 @@ class DataXceiver extends Receiver imple getOutputStream()); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); - updateCurrentThreadName("Reading metadata for block " + block); - final LengthInputStream metadataIn = - datanode.data.getMetaDataInputStream(block); - final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream( - metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); + // client side now can specify a range of the block for checksum + long requestLength = block.getNumBytes(); + Preconditions.checkArgument(requestLength >= 0); + long visibleLength = datanode.data.getReplicaVisibleLength(block); + boolean partialBlk = requestLength < visibleLength; + updateCurrentThreadName("Reading metadata for block " + block); + final LengthInputStream metadataIn = datanode.data + .getMetaDataInputStream(block); + + final DataInputStream checksumIn = new DataInputStream( + new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); updateCurrentThreadName("Getting checksum for block " + block); try { //read metadata file - final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - final DataChecksum checksum = header.getChecksum(); + final BlockMetadataHeader header = BlockMetadataHeader + .readHeader(checksumIn); + final DataChecksum checksum = header.getChecksum(); + final int csize = checksum.getChecksumSize(); final int bytesPerCRC = checksum.getBytesPerChecksum(); - final long crcPerBlock = checksum.getChecksumSize() > 0 - ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize() - : 0; - - //compute block checksum - final MD5Hash md5 = MD5Hash.digest(checksumIn); + final long crcPerBlock = csize <= 0 ? 0 : + (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize; + final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? + calcPartialBlockChecksum(block, requestLength, checksum, checksumIn) + : MD5Hash.digest(checksumIn); if (LOG.isDebugEnabled()) { LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5); @@ -841,8 +887,7 @@ class DataXceiver extends Receiver imple .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(PBHelper.convert(checksum.getChecksumType())) - ) + .setCrcType(PBHelper.convert(checksum.getChecksumType()))) .build() .writeDelimitedTo(out); out.flush(); Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1596937&r1=1596936&r2=1596937&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Thu May 22 18:50:20 2014 @@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -452,7 +451,7 @@ public class DatanodeWebHdfsMethods { MD5MD5CRC32FileChecksum checksum = null; DFSClient dfsclient = newDfsClient(nnId, conf); try { - checksum = dfsclient.getFileChecksum(fullpath); + checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE); dfsclient.close(); dfsclient = null; } finally { Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1596937&r1=1596936&r2=1596937&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Thu May 22 18:50:20 2014 @@ -121,7 +121,7 @@ public class FileChecksumServlets { try { final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, datanode, conf, getUGI(request, conf)); - final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path); + final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path, Long.MAX_VALUE); MD5MD5CRC32FileChecksum.write(xml, checksum); } catch(IOException ioe) { writeXml(ioe, path, xml);