HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum. Contributed by Kai Zheng
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/307ec80a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/307ec80a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/307ec80a Branch: refs/heads/HDFS-1312 Commit: 307ec80acae3b4a41d21b2d4b3a55032e55fcdc6 Parents: 680f3fc Author: Uma Maheswara Rao G <[email protected]> Authored: Mon Feb 29 21:52:20 2016 -0800 Committer: Uma Maheswara Rao G <[email protected]> Committed: Mon Feb 29 21:52:20 2016 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/hadoop/io/IOUtils.java | 4 +- .../main/java/org/apache/hadoop/io/MD5Hash.java | 11 + .../org/apache/hadoop/util/DataChecksum.java | 12 +- .../java/org/apache/hadoop/hdfs/DFSClient.java | 237 ++--------- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 49 +++ .../apache/hadoop/hdfs/FileChecksumHelper.java | 416 +++++++++++++++++++ .../protocol/datatransfer/IOStreamPair.java | 11 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/datanode/BlockChecksumHelper.java | 254 +++++++++++ .../hdfs/server/datanode/BlockSender.java | 1 - .../hadoop/hdfs/server/datanode/DataNode.java | 10 + .../hdfs/server/datanode/DataXceiver.java | 162 +++----- 12 files changed, 846 insertions(+), 324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java index 451163c..2588bf1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java @@ -261,7 +261,9 @@ public class IOUtils { * @param stream the Stream to close */ public static void closeStream(java.io.Closeable stream) { - cleanup(null, stream); + if (stream != null) { + cleanup(null, stream); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java index 822e089..aaf3ea1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java @@ -128,6 +128,17 @@ public class MD5Hash implements WritableComparable<MD5Hash> { return new MD5Hash(digest); } + /** Construct a hash value for an array of byte array. */ + public static MD5Hash digest(byte[][] dataArr, int start, int len) { + byte[] digest; + MessageDigest digester = getDigester(); + for (byte[] data : dataArr) { + digester.update(data, start, len); + } + digest = digester.digest(); + return new MD5Hash(digest); + } + /** Construct a hash value for a String. */ public static MD5Hash digest(String string) { return digest(UTF8.getBytes(string)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index faac587..e44b64d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -45,7 +45,7 @@ public class DataChecksum implements Checksum { public static final int CHECKSUM_MIXED = 4; /** The checksum types */ - public static enum Type { + public enum Type { NULL (CHECKSUM_NULL, 0), CRC32 (CHECKSUM_CRC32, 4), CRC32C(CHECKSUM_CRC32C, 4), @@ -55,7 +55,7 @@ public class DataChecksum implements Checksum { public final int id; public final int size; - private Type(int id, int size) { + Type(int id, int size) { this.id = id; this.size = size; } @@ -230,17 +230,21 @@ public class DataChecksum implements Checksum { public Type getChecksumType() { return type; } + /** @return the size for a checksum. */ public int getChecksumSize() { return type.size; } + /** @return the required checksum size given the data length. */ public int getChecksumSize(int dataSize) { return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize(); } + public int getBytesPerChecksum() { return bytesPerChecksum; } + public int getNumBytesInSum() { return inSum; } @@ -249,16 +253,19 @@ public class DataChecksum implements Checksum { static public int getChecksumHeaderSize() { return 1 + SIZE_OF_INTEGER; // type byte, bytesPerChecksum int } + //Checksum Interface. Just a wrapper around member summer. @Override public long getValue() { return summer.getValue(); } + @Override public void reset() { summer.reset(); inSum = 0; } + @Override public void update( byte[] b, int off, int len ) { if ( len > 0 ) { @@ -266,6 +273,7 @@ public class DataChecksum implements Checksum { inSum += len; } } + @Override public void update( int b ) { summer.update( b ); http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 15a49f1..da3f745 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -27,12 +27,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCA import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -80,9 +77,7 @@ import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.InvalidPathException; -import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; -import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; @@ -138,7 +133,6 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; @@ -146,20 +140,16 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactor import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.util.IOUtilsClient; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.ipc.RPC; @@ -1293,7 +1283,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, /** * Invoke namenode append RPC. - * It retries in case of {@link BlockNotYetCompleteException}. + * It retries in case of some {@link RetriableException}. */ private LastBlockWithStatus callAppend(String src, EnumSetWritable<CreateFlag> flag) throws IOException { @@ -1695,7 +1685,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } /** - * Get the checksum of the whole file of a range of the file. Note that the + * Get the checksum of the whole file or a range of the file. Note that the * range always starts from the beginning of the file. * @param src The file path * @param length the length of the range, i.e., the range is [0, length] @@ -1706,9 +1696,23 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throws IOException { checkOpen(); Preconditions.checkArgument(length >= 0); + + LocatedBlocks blockLocations = getBlockLocations(src, length); + + FileChecksumHelper.FileChecksumComputer maker = + new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length, + blockLocations, namenode, this); + + maker.compute(); + + return maker.getFileChecksum(); + } + + protected LocatedBlocks getBlockLocations(String src, + long length) throws IOException { //get block locations for the file range - LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, - length); + LocatedBlocks blockLocations = callGetBlockLocations(namenode, + src, 0, length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1716,194 +1720,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throw new IOException("Fail to get checksum, since file " + src + " is under construction."); } - List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks(); - final DataOutputBuffer md5out = new DataOutputBuffer(); - int bytesPerCRC = -1; - DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; - long crcPerBlock = 0; - boolean refetchBlocks = false; - int lastRetriedIndex = -1; - - // get block checksum for each block - long remaining = length; - if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) { - remaining = Math.min(length, blockLocations.getFileLength()); - } - for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { - if (refetchBlocks) { // refetch to get fresh tokens - blockLocations = callGetBlockLocations(namenode, src, 0, length); - if (null == blockLocations) { - throw new FileNotFoundException("File does not exist: " + src); - } - if (blockLocations.isUnderConstruction()) { - throw new IOException("Fail to get checksum, since file " + src - + " is under construction."); - } - locatedblocks = blockLocations.getLocatedBlocks(); - refetchBlocks = false; - } - LocatedBlock lb = locatedblocks.get(i); - final ExtendedBlock block = lb.getBlock(); - if (remaining < block.getNumBytes()) { - block.setNumBytes(remaining); - } - remaining -= block.getNumBytes(); - final DatanodeInfo[] datanodes = lb.getLocations(); - - //try each datanode location of the block - final int timeout = 3000 * datanodes.length + - dfsClientConf.getSocketTimeout(); - boolean done = false; - for(int j = 0; !done && j < datanodes.length; j++) { - DataOutputStream out = null; - DataInputStream in = null; - - try { - //connect to a datanode - IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); - out = new DataOutputStream(new BufferedOutputStream(pair.out, - smallBufferSize)); - in = new DataInputStream(pair.in); - - LOG.debug("write to {}: {}, block={}", - datanodes[j], Op.BLOCK_CHECKSUM, block); - // get block MD5 - new Sender(out).blockChecksum(block, lb.getBlockToken()); - - final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); - - String logInfo = "for block " + block + " from datanode " + - datanodes[j]; - DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); - - OpBlockChecksumResponseProto checksumData = - reply.getChecksumResponse(); - - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (i == 0) { //first block - bytesPerCRC = bpc; - } - else if (bpc != bytesPerCRC) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + bytesPerCRC); - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (locatedblocks.size() > 1 && i == 0) { - crcPerBlock = cpb; - } - - //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); - md5.write(md5out); - - // read crc-type - final DataChecksum.Type ct; - if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData - .getCrcType()); - } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); - ct = inferChecksumTypeByReading(lb, datanodes[j]); - } - - if (i == 0) { // first block - crcType = ct; - } else if (crcType != DataChecksum.Type.MIXED - && crcType != ct) { - // if crc types are mixed in a file - crcType = DataChecksum.Type.MIXED; - } - - done = true; - - if (LOG.isDebugEnabled()) { - if (i == 0) { - LOG.debug("set bytesPerCRC=" + bytesPerCRC - + ", crcPerBlock=" + crcPerBlock); - } - LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5); - } - } catch (InvalidBlockTokenException ibte) { - if (i > lastRetriedIndex) { - LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " - + "for file {} for block {} from datanode {}. Will retry " - + "the block once.", - src, block, datanodes[j]); - lastRetriedIndex = i; - done = true; // actually it's not done; but we'll retry - i--; // repeat at i-th block - refetchBlocks = true; - break; - } - } catch (IOException ie) { - LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie); - } finally { - IOUtils.closeStream(in); - IOUtils.closeStream(out); - } - } - - if (!done) { - throw new IOException("Fail to get block MD5 for " + block); - } - } - //compute file MD5 - final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); - switch (crcType) { - case CRC32: - return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, - crcPerBlock, fileMD5); - case CRC32C: - return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, - crcPerBlock, fileMD5); - default: - // If there is no block allocated for the file, - // return one with the magic entry that matches what previous - // hdfs versions return. - if (locatedblocks.size() == 0) { - return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); - } - - // we should never get here since the validity was checked - // when getCrcType() was called above. - return null; - } + return blockLocations; } - /** - * Connect to the given datanode's datantrasfer port, and return - * the resulting IOStreamPair. This includes encryption wrapping, etc. - */ - private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, - LocatedBlock lb) throws IOException { - boolean success = false; - Socket sock = null; - try { - sock = socketFactory.createSocket(); - String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname()); - LOG.debug("Connecting to datanode {}", dnAddr); - NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); - sock.setTcpNoDelay(dfsClientConf.getDataTransferTcpNoDelay()); - sock.setSoTimeout(timeout); - - OutputStream unbufOut = NetUtils.getOutputStream(sock); - InputStream unbufIn = NetUtils.getInputStream(sock); - IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, - lb.getBlockToken(), dn); - success = true; - return ret; - } finally { - if (!success) { - IOUtils.closeSocket(sock); - } - } + protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout, + Token<BlockTokenIdentifier> blockToken) + throws IOException { + return DFSUtilClient.connectToDN(dn, timeout, conf, saslClient, + socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken); } /** @@ -1917,19 +1742,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @return the inferred checksum type * @throws IOException if an error occurs */ - private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) + protected Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { - IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); + IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), + lb.getBlockToken()); try { - DataOutputStream out = new DataOutputStream( - new BufferedOutputStream(pair.out, smallBufferSize)); - DataInputStream in = new DataInputStream(pair.in); - - new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, + new Sender((DataOutputStream) pair.out).readBlock(lb.getBlock(), + lb.getBlockToken(), clientName, 0, 1, true, CachingStrategy.newDefaultStrategy()); final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); + BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(pair.in)); String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index d646252..880234e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; @@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; @@ -56,8 +58,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.SocketFactory; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -717,4 +724,46 @@ public class DFSUtilClient { return corruptionMap; } } + + /** + * Connect to the given datanode's datantrasfer port, and return + * the resulting IOStreamPair. This includes encryption wrapping, etc. + */ + public static IOStreamPair connectToDN(DatanodeInfo dn, int timeout, + Configuration conf, + SaslDataTransferClient saslClient, + SocketFactory socketFactory, + boolean connectToDnViaHostname, + DataEncryptionKeyFactory dekFactory, + Token<BlockTokenIdentifier> blockToken) + throws IOException { + + boolean success = false; + Socket sock = null; + try { + sock = socketFactory.createSocket(); + String dnAddr = dn.getXferAddr(connectToDnViaHostname); + LOG.debug("Connecting to datanode {}", dnAddr); + NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); + sock.setSoTimeout(timeout); + + OutputStream unbufOut = NetUtils.getOutputStream(sock); + InputStream unbufIn = NetUtils.getInputStream(sock); + IOStreamPair pair = saslClient.newSocketSend(sock, unbufOut, + unbufIn, dekFactory, blockToken, dn); + + IOStreamPair result = new IOStreamPair( + new DataInputStream(pair.in), + new DataOutputStream(new BufferedOutputStream(pair.out, + DFSUtilClient.getSmallBufferSize(conf))) + ); + + success = true; + return result; + } finally { + if (!success) { + IOUtils.closeSocket(sock); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java new file mode 100644 index 0000000..d15db9f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; +import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; +import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Op; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +/** + * Utility classes to compute file checksum for both replicated and striped + * files. + */ +final class FileChecksumHelper { + static final Logger LOG = + LoggerFactory.getLogger(FileChecksumHelper.class); + + private FileChecksumHelper() {} + + /** + * A common abstract class to compute file checksum. + */ + static abstract class FileChecksumComputer { + private final String src; + private final long length; + private final DFSClient client; + private final ClientProtocol namenode; + private final DataOutputBuffer md5out = new DataOutputBuffer(); + + private MD5MD5CRC32FileChecksum fileChecksum; + private LocatedBlocks blockLocations; + + private int timeout; + private List<LocatedBlock> locatedBlocks; + private long remaining = 0L; + + private int bytesPerCRC = -1; + private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; + private long crcPerBlock = 0; + private boolean refetchBlocks = false; + private int lastRetriedIndex = -1; + + /** + * Constructor that accepts all the input parameters for the computing. + */ + FileChecksumComputer(String src, long length, + LocatedBlocks blockLocations, + ClientProtocol namenode, + DFSClient client) throws IOException { + this.src = src; + this.length = length; + this.blockLocations = blockLocations; + this.namenode = namenode; + this.client = client; + + this.remaining = length; + if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) { + this.remaining = Math.min(length, blockLocations.getFileLength()); + } + + this.locatedBlocks = blockLocations.getLocatedBlocks(); + } + + String getSrc() { + return src; + } + + long getLength() { + return length; + } + + DFSClient getClient() { + return client; + } + + ClientProtocol getNamenode() { + return namenode; + } + + DataOutputBuffer getMd5out() { + return md5out; + } + + MD5MD5CRC32FileChecksum getFileChecksum() { + return fileChecksum; + } + + LocatedBlocks getBlockLocations() { + return blockLocations; + } + + void setBlockLocations(LocatedBlocks blockLocations) { + this.blockLocations = blockLocations; + } + + int getTimeout() { + return timeout; + } + + void setTimeout(int timeout) { + this.timeout = timeout; + } + + List<LocatedBlock> getLocatedBlocks() { + return locatedBlocks; + } + + void setLocatedBlocks(List<LocatedBlock> locatedBlocks) { + this.locatedBlocks = locatedBlocks; + } + + long getRemaining() { + return remaining; + } + + void setRemaining(long remaining) { + this.remaining = remaining; + } + + int getBytesPerCRC() { + return bytesPerCRC; + } + + void setBytesPerCRC(int bytesPerCRC) { + this.bytesPerCRC = bytesPerCRC; + } + + DataChecksum.Type getCrcType() { + return crcType; + } + + void setCrcType(DataChecksum.Type crcType) { + this.crcType = crcType; + } + + long getCrcPerBlock() { + return crcPerBlock; + } + + void setCrcPerBlock(long crcPerBlock) { + this.crcPerBlock = crcPerBlock; + } + + boolean isRefetchBlocks() { + return refetchBlocks; + } + + void setRefetchBlocks(boolean refetchBlocks) { + this.refetchBlocks = refetchBlocks; + } + + int getLastRetriedIndex() { + return lastRetriedIndex; + } + + void setLastRetriedIndex(int lastRetriedIndex) { + this.lastRetriedIndex = lastRetriedIndex; + } + + /** + * Perform the file checksum computing. The intermediate results are stored + * in the object and will be used later. + * @throws IOException + */ + void compute() throws IOException { + checksumBlocks(); + + fileChecksum = makeFinalResult(); + } + + /** + * Compute and aggregate block checksums block by block. + * @throws IOException + */ + abstract void checksumBlocks() throws IOException; + + /** + * Make final file checksum result given the computing process done. + */ + MD5MD5CRC32FileChecksum makeFinalResult() { + //compute file MD5 + final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); + switch (crcType) { + case CRC32: + return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, + crcPerBlock, fileMD5); + case CRC32C: + return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, + crcPerBlock, fileMD5); + default: + // If there is no block allocated for the file, + // return one with the magic entry that matches what previous + // hdfs versions return. + if (locatedBlocks.isEmpty()) { + return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); + } + + // we should never get here since the validity was checked + // when getCrcType() was called above. + return null; + } + } + + /** + * Create and return a sender given an IO stream pair. + */ + Sender createSender(IOStreamPair pair) { + DataOutputStream out = (DataOutputStream) pair.out; + return new Sender(out); + } + + /** + * Close an IO stream pair. + */ + void close(IOStreamPair pair) { + if (pair != null) { + IOUtils.closeStream(pair.in); + IOUtils.closeStream(pair.out); + } + } + } + + /** + * Replicated file checksum computer. + */ + static class ReplicatedFileChecksumComputer extends FileChecksumComputer { + private int blockIdx; + + ReplicatedFileChecksumComputer(String src, long length, + LocatedBlocks blockLocations, + ClientProtocol namenode, + DFSClient client) throws IOException { + super(src, length, blockLocations, namenode, client); + } + + @Override + void checksumBlocks() throws IOException { + // get block checksum for each block + for (blockIdx = 0; + blockIdx < getLocatedBlocks().size() && getRemaining() >= 0; + blockIdx++) { + if (isRefetchBlocks()) { // refetch to get fresh tokens + setBlockLocations(getClient().getBlockLocations(getSrc(), + getLength())); + setLocatedBlocks(getBlockLocations().getLocatedBlocks()); + setRefetchBlocks(false); + } + + LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx); + + if (!checksumBlock(locatedBlock)) { + throw new IOException("Fail to get block MD5 for " + locatedBlock); + } + } + } + + /** + * Return true when sounds good to continue or retry, false when severe + * condition or totally failed. + */ + private boolean checksumBlock( + LocatedBlock locatedBlock) throws IOException { + ExtendedBlock block = locatedBlock.getBlock(); + if (getRemaining() < block.getNumBytes()) { + block.setNumBytes(getRemaining()); + } + setRemaining(getRemaining() - block.getNumBytes()); + + DatanodeInfo[] datanodes = locatedBlock.getLocations(); + + int tmpTimeout = 3000 * datanodes.length + + getClient().getConf().getSocketTimeout(); + setTimeout(tmpTimeout); + + //try each datanode location of the block + boolean done = false; + for (int j = 0; !done && j < datanodes.length; j++) { + try { + tryDatanode(locatedBlock, datanodes[j]); + done = true; + } catch (InvalidBlockTokenException ibte) { + if (blockIdx > getLastRetriedIndex()) { + LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " + + "for file {} for block {} from datanode {}. Will retry " + + "the block once.", + getSrc(), block, datanodes[j]); + setLastRetriedIndex(blockIdx); + done = true; // actually it's not done; but we'll retry + blockIdx--; // repeat at blockIdx-th block + setRefetchBlocks(true); + } + } catch (IOException ie) { + LOG.warn("src={}" + ", datanodes[{}]={}", + getSrc(), j, datanodes[j], ie); + } + } + + return done; + } + + /** + * Try one replica or datanode to compute the block checksum given a block. + */ + private void tryDatanode(LocatedBlock locatedBlock, + DatanodeInfo datanode) throws IOException { + + ExtendedBlock block = locatedBlock.getBlock(); + + try (IOStreamPair pair = getClient().connectToDN(datanode, getTimeout(), + locatedBlock.getBlockToken())) { + + LOG.debug("write to {}: {}, block={}", datanode, + Op.BLOCK_CHECKSUM, block); + + // get block MD5 + createSender(pair).blockChecksum(block, + locatedBlock.getBlockToken()); + + final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(pair.in)); + + String logInfo = "for block " + block + " from datanode " + + datanode; + DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); + + OpBlockChecksumResponseProto checksumData = + reply.getChecksumResponse(); + + //read byte-per-checksum + final int bpc = checksumData.getBytesPerCrc(); + if (blockIdx == 0) { //first block + setBytesPerCRC(bpc); + } else if (bpc != getBytesPerCRC()) { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + getBytesPerCRC()); + } + + //read crc-per-block + final long cpb = checksumData.getCrcPerBlock(); + if (getLocatedBlocks().size() > 1 && blockIdx == 0) { + setCrcPerBlock(cpb); + } + + //read md5 + final MD5Hash md5 = new MD5Hash( + checksumData.getMd5().toByteArray()); + md5.write(getMd5out()); + + // read crc-type + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelperClient.convert(checksumData + .getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode); + } + + if (blockIdx == 0) { // first block + setCrcType(ct); + } else if (getCrcType() != DataChecksum.Type.MIXED + && getCrcType() != ct) { + // if crc types are mixed in a file + setCrcType(DataChecksum.Type.MIXED); + } + + if (LOG.isDebugEnabled()) { + if (blockIdx == 0) { + LOG.debug("set bytesPerCRC=" + getBytesPerCRC() + + ", crcPerBlock=" + getCrcPerBlock()); + } + LOG.debug("got reply from " + datanode + ": md5=" + md5); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java index 4157a30..4ec73e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; +import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.IOUtils; /** * A little struct class to wrap an InputStream and an OutputStream. */ @InterfaceAudience.Private -public class IOStreamPair { +public class IOStreamPair implements Closeable { public final InputStream in; public final OutputStream out; @@ -34,4 +37,10 @@ public class IOStreamPair { this.in = in; this.out = out; } + + @Override + public void close() throws IOException { + IOUtils.closeStream(in); + IOUtils.closeStream(out); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c3ea5ce..bb847a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -233,6 +233,9 @@ Trunk (Unreleased) HDFS-9838. Refactor the excessReplicateMap to a class. (szetszwo) + HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum + (Kai Zheng via umamahesh) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java new file mode 100644 index 0000000..9a5552d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; + +/** + * Utilities for Block checksum computing, for both replicated and striped + * blocks. + */ +final class BlockChecksumHelper { + + static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class); + + private BlockChecksumHelper() {} + + /** + * The abstract base block checksum computer. + */ + static abstract class BlockChecksumComputer { + private final DataNode datanode; + private final ExtendedBlock block; + // client side now can specify a range of the block for checksum + private final long requestLength; + private final LengthInputStream metadataIn; + private final DataInputStream checksumIn; + private final long visibleLength; + private final boolean partialBlk; + + private byte[] outBytes; + private int bytesPerCRC = -1; + private DataChecksum.Type crcType = null; + private long crcPerBlock = -1; + private int checksumSize = -1; + private BlockMetadataHeader header; + private DataChecksum checksum; + + BlockChecksumComputer(DataNode datanode, + ExtendedBlock block) throws IOException { + this.datanode = datanode; + this.block = block; + this.requestLength = block.getNumBytes(); + Preconditions.checkArgument(requestLength >= 0); + + this.metadataIn = datanode.data.getMetaDataInputStream(block); + this.visibleLength = datanode.data.getReplicaVisibleLength(block); + this.partialBlk = requestLength < visibleLength; + + int ioFileBufferSize = + DFSUtilClient.getIoFileBufferSize(datanode.getConf()); + this.checksumIn = new DataInputStream( + new BufferedInputStream(metadataIn, ioFileBufferSize)); + } + + protected DataNode getDatanode() { + return datanode; + } + + protected ExtendedBlock getBlock() { + return block; + } + + protected long getRequestLength() { + return requestLength; + } + + protected LengthInputStream getMetadataIn() { + return metadataIn; + } + + protected DataInputStream getChecksumIn() { + return checksumIn; + } + + protected long getVisibleLength() { + return visibleLength; + } + + protected boolean isPartialBlk() { + return partialBlk; + } + + protected void setOutBytes(byte[] bytes) { + this.outBytes = bytes; + } + + protected byte[] getOutBytes() { + return outBytes; + } + + protected int getBytesPerCRC() { + return bytesPerCRC; + } + + protected DataChecksum.Type getCrcType() { + return crcType; + } + + protected long getCrcPerBlock() { + return crcPerBlock; + } + + protected int getChecksumSize() { + return checksumSize; + } + + protected BlockMetadataHeader getHeader() { + return header; + } + + protected DataChecksum getChecksum() { + return checksum; + } + + /** + * Perform the block checksum computing. + * @throws IOException + */ + abstract void compute() throws IOException; + + /** + * Read block metadata header. + * @throws IOException + */ + protected void readHeader() throws IOException { + //read metadata file + header = BlockMetadataHeader.readHeader(checksumIn); + checksum = header.getChecksum(); + checksumSize = checksum.getChecksumSize(); + bytesPerCRC = checksum.getBytesPerChecksum(); + crcPerBlock = checksumSize <= 0 ? 0 : + (metadataIn.getLength() - + BlockMetadataHeader.getHeaderSize()) / checksumSize; + crcType = checksum.getChecksumType(); + } + + /** + * Calculate partial block checksum. + * @return + * @throws IOException + */ + protected byte[] crcPartialBlock() throws IOException { + int partialLength = (int) (requestLength % bytesPerCRC); + if (partialLength > 0) { + byte[] buf = new byte[partialLength]; + final InputStream blockIn = datanode.data.getBlockInputStream(block, + requestLength - partialLength); + try { + // Get the CRC of the partialLength. + IOUtils.readFully(blockIn, buf, 0, partialLength); + } finally { + IOUtils.closeStream(blockIn); + } + checksum.update(buf, 0, partialLength); + byte[] partialCrc = new byte[checksumSize]; + checksum.writeValue(partialCrc, 0, true); + return partialCrc; + } + + return null; + } + } + + /** + * Replicated block checksum computer. + */ + static class ReplicatedBlockChecksumComputer extends BlockChecksumComputer { + + ReplicatedBlockChecksumComputer(DataNode datanode, + ExtendedBlock block) throws IOException { + super(datanode, block); + } + + @Override + void compute() throws IOException { + try { + readHeader(); + + MD5Hash md5out; + if (isPartialBlk() && getCrcPerBlock() > 0) { + md5out = checksumPartialBlock(); + } else { + md5out = checksumWholeBlock(); + } + setOutBytes(md5out.getDigest()); + + if (LOG.isDebugEnabled()) { + LOG.debug("block=" + getBlock() + ", bytesPerCRC=" + getBytesPerCRC() + + ", crcPerBlock=" + getCrcPerBlock() + ", md5out=" + md5out); + } + } finally { + IOUtils.closeStream(getChecksumIn()); + IOUtils.closeStream(getMetadataIn()); + } + } + + private MD5Hash checksumWholeBlock() throws IOException { + MD5Hash md5out = MD5Hash.digest(getChecksumIn()); + return md5out; + } + + private MD5Hash checksumPartialBlock() throws IOException { + byte[] buffer = new byte[4*1024]; + MessageDigest digester = MD5Hash.getDigester(); + + long remaining = (getRequestLength() / getBytesPerCRC()) + * getChecksumSize(); + for (int toDigest = 0; remaining > 0; remaining -= toDigest) { + toDigest = getChecksumIn().read(buffer, 0, + (int) Math.min(remaining, buffer.length)); + if (toDigest < 0) { + break; + } + digester.update(buffer, 0, toDigest); + } + + byte[] partialCrc = crcPartialBlock(); + if (partialCrc != null) { + digester.update(partialCrc); + } + + return new MD5Hash(digester.digest()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index ee079cb..773a64c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -47,7 +47,6 @@ import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.core.Sampler; import org.apache.htrace.core.TraceScope; import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED; http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 470d3ca..3e2a25d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3170,6 +3170,16 @@ public class DataNode extends ReconfigurableBase return ecWorker; } + IOStreamPair connectToDN(DatanodeInfo datanodeID, int timeout, + ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken) + throws IOException { + + return DFSUtilClient.connectToDN(datanodeID, timeout, conf, saslClient, + NetUtils.getDefaultSocketFactory(getConf()), false, + getDataEncryptionKeyFactoryForBlock(block), blockToken); + } + /** * Get timeout value of each OOB type from configuration */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 0041cd8..1d4a79a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -17,37 +17,9 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION; -import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; -import static org.apache.hadoop.util.Time.monotonicNow; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.FileDescriptor; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.nio.channels.ClosedChannelException; -import java.security.MessageDigest; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSUtilClient; @@ -73,26 +45,52 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer; +import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StopWatch; - -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; import org.apache.hadoop.util.Time; import org.slf4j.Logger; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; +import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; +import static org.apache.hadoop.util.Time.monotonicNow; + /** * Thread for processing incoming/outgoing data stream. @@ -886,90 +884,32 @@ class DataXceiver extends Receiver implements Runnable { } } - private MD5Hash calcPartialBlockChecksum(ExtendedBlock block, - long requestLength, DataChecksum checksum, DataInputStream checksumIn) - throws IOException { - final int bytesPerCRC = checksum.getBytesPerChecksum(); - final int csize = checksum.getChecksumSize(); - final byte[] buffer = new byte[4*1024]; - MessageDigest digester = MD5Hash.getDigester(); - - long remaining = requestLength / bytesPerCRC * csize; - for (int toDigest = 0; remaining > 0; remaining -= toDigest) { - toDigest = checksumIn.read(buffer, 0, - (int) Math.min(remaining, buffer.length)); - if (toDigest < 0) { - break; - } - digester.update(buffer, 0, toDigest); - } - - int partialLength = (int) (requestLength % bytesPerCRC); - if (partialLength > 0) { - byte[] buf = new byte[partialLength]; - final InputStream blockIn = datanode.data.getBlockInputStream(block, - requestLength - partialLength); - try { - // Get the CRC of the partialLength. - IOUtils.readFully(blockIn, buf, 0, partialLength); - } finally { - IOUtils.closeStream(blockIn); - } - checksum.update(buf, 0, partialLength); - byte[] partialCrc = new byte[csize]; - checksum.writeValue(partialCrc, 0, true); - digester.update(partialCrc); - } - return new MD5Hash(digester.digest()); - } - @Override - public void blockChecksum(final ExtendedBlock block, - final Token<BlockTokenIdentifier> blockToken) throws IOException { + public void blockChecksum(ExtendedBlock block, + Token<BlockTokenIdentifier> blockToken) + throws IOException { updateCurrentThreadName("Getting checksum for block " + block); final DataOutputStream out = new DataOutputStream( getOutputStream()); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); - // client side now can specify a range of the block for checksum - long requestLength = block.getNumBytes(); - Preconditions.checkArgument(requestLength >= 0); - long visibleLength = datanode.data.getReplicaVisibleLength(block); - boolean partialBlk = requestLength < visibleLength; - - final LengthInputStream metadataIn = datanode.data - .getMetaDataInputStream(block); - - final DataInputStream checksumIn = new DataInputStream( - new BufferedInputStream(metadataIn, ioFileBufferSize)); + + BlockChecksumComputer maker = + new ReplicatedBlockChecksumComputer(datanode, block); + try { - //read metadata file - final BlockMetadataHeader header = BlockMetadataHeader - .readHeader(checksumIn); - final DataChecksum checksum = header.getChecksum(); - final int csize = checksum.getChecksumSize(); - final int bytesPerCRC = checksum.getBytesPerChecksum(); - final long crcPerBlock = csize <= 0 ? 0 : - (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize; - - final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? - calcPartialBlockChecksum(block, requestLength, checksum, checksumIn) - : MD5Hash.digest(checksumIn); - if (LOG.isDebugEnabled()) { - LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC - + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5); - } + maker.compute(); //write reply BlockOpResponseProto.newBuilder() - .setStatus(SUCCESS) - .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() - .setBytesPerCrc(bytesPerCRC) - .setCrcPerBlock(crcPerBlock) - .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(PBHelperClient.convert(checksum.getChecksumType()))) - .build() - .writeDelimitedTo(out); + .setStatus(SUCCESS) + .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() + .setBytesPerCrc(maker.getBytesPerCRC()) + .setCrcPerBlock(maker.getCrcPerBlock()) + .setMd5(ByteString.copyFrom(maker.getOutBytes())) + .setCrcType(PBHelperClient.convert(maker.getCrcType()))) + .build() + .writeDelimitedTo(out); out.flush(); } catch (IOException ioe) { LOG.info("blockChecksum " + block + " received exception " + ioe); @@ -977,8 +917,6 @@ class DataXceiver extends Receiver implements Runnable { throw ioe; } finally { IOUtils.closeStream(out); - IOUtils.closeStream(checksumIn); - IOUtils.closeStream(metadataIn); } //update metrics @@ -1276,7 +1214,7 @@ class DataXceiver extends Receiver implements Runnable { /** * Wait until the BP is registered, upto the configured amount of time. * Throws an exception if times out, which should fail the client request. - * @param the requested block + * @param block requested block */ void checkAndWaitForBP(final ExtendedBlock block) throws IOException {
