http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java deleted file mode 100644 index 2a77cb6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ /dev/null @@ -1,477 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.util.EnumSet; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - -import com.google.common.annotations.VisibleForTesting; - -/** - * This is a wrapper around connection to datanode - * and understands checksum, offset etc. - * - * Terminology: - * <dl> - * <dt>block</dt> - * <dd>The hdfs block, typically large (~64MB). - * </dd> - * <dt>chunk</dt> - * <dd>A block is divided into chunks, each comes with a checksum. - * We want transfers to be chunk-aligned, to be able to - * verify checksums. - * </dd> - * <dt>packet</dt> - * <dd>A grouping of chunks used for transport. It contains a - * header, followed by checksum data, followed by real data. - * </dd> - * </dl> - * Please see DataNode for the RPC specification. - * - * This is a new implementation introduced in Hadoop 0.23 which - * is more efficient and simpler than the older BlockReader - * implementation. It should be renamed to RemoteBlockReader - * once we are confident in it. - */ [email protected] -public class RemoteBlockReader2 implements BlockReader { - - static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); - - final private Peer peer; - final private DatanodeID datanodeID; - final private PeerCache peerCache; - final private long blockId; - private final ReadableByteChannel in; - private DataChecksum checksum; - - private final PacketReceiver packetReceiver = new PacketReceiver(true); - private ByteBuffer curDataSlice = null; - - /** offset in block of the last chunk received */ - private long lastSeqNo = -1; - - /** offset in block where reader wants to actually read */ - private long startOffset; - private final String filename; - - private final int bytesPerChecksum; - private final int checksumSize; - - /** - * The total number of bytes we need to transfer from the DN. - * This is the amount that the user has requested plus some padding - * at the beginning so that the read can begin on a chunk boundary. - */ - private long bytesNeededToFinish; - - /** - * True if we are reading from a local DataNode. - */ - private final boolean isLocal; - - private final boolean verifyChecksum; - - private boolean sentStatusCode = false; - - @VisibleForTesting - public Peer getPeer() { - return peer; - } - - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - - UUID randomId = null; - if (LOG.isTraceEnabled()) { - randomId = UUID.randomUUID(); - LOG.trace(String.format("Starting read #%s file %s from datanode %s", - randomId.toString(), this.filename, - this.datanodeID.getHostName())); - } - - if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - TraceScope scope = Trace.startSpan( - "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); - try { - readNextPacket(); - } finally { - scope.close(); - } - } - - if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Finishing read #" + randomId)); - } - - if (curDataSlice.remaining() == 0) { - // we're at EOF now - return -1; - } - - int nRead = Math.min(curDataSlice.remaining(), len); - curDataSlice.get(buf, off, nRead); - - return nRead; - } - - - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - TraceScope scope = Trace.startSpan( - "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); - try { - readNextPacket(); - } finally { - scope.close(); - } - } - if (curDataSlice.remaining() == 0) { - // we're at EOF now - return -1; - } - - int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); - ByteBuffer writeSlice = curDataSlice.duplicate(); - writeSlice.limit(writeSlice.position() + nRead); - buf.put(writeSlice); - curDataSlice.position(writeSlice.position()); - - return nRead; - } - - private void readNextPacket() throws IOException { - //Read packet headers. - packetReceiver.receiveNextPacket(in); - - PacketHeader curHeader = packetReceiver.getHeader(); - curDataSlice = packetReceiver.getDataSlice(); - assert curDataSlice.capacity() == curHeader.getDataLen(); - - if (LOG.isTraceEnabled()) { - LOG.trace("DFSClient readNextPacket got header " + curHeader); - } - - // Sanity check the lengths - if (!curHeader.sanityCheck(lastSeqNo)) { - throw new IOException("BlockReader: error in packet header " + - curHeader); - } - - if (curHeader.getDataLen() > 0) { - int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; - int checksumsLen = chunks * checksumSize; - - assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : - "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + - " checksumsLen=" + checksumsLen; - - lastSeqNo = curHeader.getSeqno(); - if (verifyChecksum && curDataSlice.remaining() > 0) { - // N.B.: the checksum error offset reported here is actually - // relative to the start of the block, not the start of the file. - // This is slightly misleading, but preserves the behavior from - // the older BlockReader. - checksum.verifyChunkedSums(curDataSlice, - packetReceiver.getChecksumSlice(), - filename, curHeader.getOffsetInBlock()); - } - bytesNeededToFinish -= curHeader.getDataLen(); - } - - // First packet will include some data prior to the first byte - // the user requested. Skip it. - if (curHeader.getOffsetInBlock() < startOffset) { - int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); - curDataSlice.position(newPos); - } - - // If we've now satisfied the whole client read, read one last packet - // header, which should be empty - if (bytesNeededToFinish <= 0) { - readTrailingEmptyPacket(); - if (verifyChecksum) { - sendReadResult(Status.CHECKSUM_OK); - } else { - sendReadResult(Status.SUCCESS); - } - } - } - - @Override - public synchronized long skip(long n) throws IOException { - /* How can we make sure we don't throw a ChecksumException, at least - * in majority of the cases?. This one throws. */ - long skipped = 0; - while (skipped < n) { - long needToSkip = n - skipped; - if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - readNextPacket(); - } - if (curDataSlice.remaining() == 0) { - // we're at EOF now - break; - } - - int skip = (int)Math.min(curDataSlice.remaining(), needToSkip); - curDataSlice.position(curDataSlice.position() + skip); - skipped += skip; - } - return skipped; - } - - private void readTrailingEmptyPacket() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Reading empty packet at end of read"); - } - - packetReceiver.receiveNextPacket(in); - - PacketHeader trailer = packetReceiver.getHeader(); - if (!trailer.isLastPacketInBlock() || - trailer.getDataLen() != 0) { - throw new IOException("Expected empty end-of-read packet! Header: " + - trailer); - } - } - - protected RemoteBlockReader2(String file, String bpid, long blockId, - DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache) { - this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. - createSocketAddr(datanodeID.getXferAddr())); - // Path is used only for printing block and file information in debug - this.peer = peer; - this.datanodeID = datanodeID; - this.in = peer.getInputStreamChannel(); - this.checksum = checksum; - this.verifyChecksum = verifyChecksum; - this.startOffset = Math.max( startOffset, 0 ); - this.filename = file; - this.peerCache = peerCache; - this.blockId = blockId; - - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); - bytesPerChecksum = this.checksum.getBytesPerChecksum(); - checksumSize = this.checksum.getChecksumSize(); - } - - - @Override - public synchronized void close() throws IOException { - packetReceiver.close(); - startOffset = -1; - checksum = null; - if (peerCache != null && sentStatusCode) { - peerCache.put(datanodeID, peer); - } else { - peer.close(); - } - - // in will be closed when its Socket is closed. - } - - /** - * When the reader reaches end of the read, it sends a status response - * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN - * closing our connection (which we will re-open), but won't affect - * data correctness. - */ - void sendReadResult(Status statusCode) { - assert !sentStatusCode : "already sent status code to " + peer; - try { - writeReadResult(peer.getOutputStream(), statusCode); - sentStatusCode = true; - } catch (IOException e) { - // It's ok not to be able to send this. But something is probably wrong. - LOG.info("Could not send read status (" + statusCode + ") to datanode " + - peer.getRemoteAddressString() + ": " + e.getMessage()); - } - } - - /** - * Serialize the actual read result on the wire. - */ - static void writeReadResult(OutputStream out, Status statusCode) - throws IOException { - - ClientReadStatusProto.newBuilder() - .setStatus(statusCode) - .build() - .writeDelimitedTo(out); - - out.flush(); - } - - /** - * File name to print when accessing a block directly (from servlets) - * @param s Address of the block location - * @param poolId Block pool ID of the block - * @param blockId Block ID of the block - * @return string that has a file name for debug purposes - */ - public static String getFileName(final InetSocketAddress s, - final String poolId, final long blockId) { - return s.toString() + ":" + poolId + ":" + blockId; - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public void readFully(byte[] buf, int off, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, off, len); - } - - /** - * Create a new BlockReader specifically to satisfy a read. - * This method also sends the OP_READ_BLOCK request. - * - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @param peer The Peer to use - * @param datanodeID The DatanodeID this peer is connected to - * @return New BlockReader instance, or null on error. - */ - public static BlockReader newBlockReader(String file, - ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken, - long startOffset, long len, - boolean verifyChecksum, - String clientName, - Peer peer, DatanodeID datanodeID, - PeerCache peerCache, - CachingStrategy cachingStrategy) throws IOException { - // in and out will be closed when sock is closed (by the caller) - final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - peer.getOutputStream())); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, - verifyChecksum, cachingStrategy); - - // - // Get bytes in block - // - DataInputStream in = new DataInputStream(peer.getInputStream()); - - BlockOpResponseProto status = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - checkSuccess(status, peer, block, file); - ReadOpChecksumInfoProto checksumInfo = - status.getReadOpChecksumInfo(); - DataChecksum checksum = DataTransferProtoUtil.fromProto( - checksumInfo.getChecksum()); - //Warning when we get CHECKSUM_NULL? - - // Read the first chunk offset. - long firstChunkOffset = checksumInfo.getChunkOffset(); - - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { - throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); - } - - return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), - checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, - datanodeID, peerCache); - } - - static void checkSuccess( - BlockOpResponseProto status, Peer peer, - ExtendedBlock block, String file) - throws IOException { - String logInfo = "for OP_READ_BLOCK" - + ", self=" + peer.getLocalAddressString() - + ", remote=" + peer.getRemoteAddressString() - + ", for file " + file - + ", for pool " + block.getBlockPoolId() - + " block " + block.getBlockId() + "_" + block.getGenerationStamp(); - DataTransferProtoUtil.checkBlockOpStatus(status, logInfo); - } - - @Override - public int available() throws IOException { - // An optimistic estimate of how much data is available - // to us without doing network I/O. - return DFSClient.TCP_WINDOW_SIZE; - } - - @Override - public boolean isLocal() { - return isLocal; - } - - @Override - public boolean isShortCircuit() { - return false; - } - - @Override - public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { - return null; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java deleted file mode 100644 index c9966a7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto; -import org.apache.hadoop.hdfs.util.ByteBufferOutputStream; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Shorts; -import com.google.common.primitives.Ints; -import com.google.protobuf.InvalidProtocolBufferException; - -/** - * Header data for each packet that goes through the read/write pipelines. - * Includes all of the information about the packet, excluding checksums and - * actual data. - * - * This data includes: - * - the offset in bytes into the HDFS block of the data in this packet - * - the sequence number of this packet in the pipeline - * - whether or not this is the last packet in the pipeline - * - the length of the data in this packet - * - whether or not this packet should be synced by the DNs. - * - * When serialized, this header is written out as a protocol buffer, preceded - * by a 4-byte integer representing the full packet length, and a 2-byte short - * representing the header length. - */ [email protected] [email protected] -public class PacketHeader { - private static final int MAX_PROTO_SIZE = - PacketHeaderProto.newBuilder() - .setOffsetInBlock(0) - .setSeqno(0) - .setLastPacketInBlock(false) - .setDataLen(0) - .setSyncBlock(false) - .build().getSerializedSize(); - public static final int PKT_LENGTHS_LEN = - Ints.BYTES + Shorts.BYTES; - public static final int PKT_MAX_HEADER_LEN = - PKT_LENGTHS_LEN + MAX_PROTO_SIZE; - - private int packetLen; - private PacketHeaderProto proto; - - public PacketHeader() { - } - - public PacketHeader(int packetLen, long offsetInBlock, long seqno, - boolean lastPacketInBlock, int dataLen, boolean syncBlock) { - this.packetLen = packetLen; - Preconditions.checkArgument(packetLen >= Ints.BYTES, - "packet len %s should always be at least 4 bytes", - packetLen); - - PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder() - .setOffsetInBlock(offsetInBlock) - .setSeqno(seqno) - .setLastPacketInBlock(lastPacketInBlock) - .setDataLen(dataLen); - - if (syncBlock) { - // Only set syncBlock if it is specified. - // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721 - // because it changes the length of the packet header, and BlockReceiver - // in that version did not support variable-length headers. - builder.setSyncBlock(syncBlock); - } - - proto = builder.build(); - } - - public int getDataLen() { - return proto.getDataLen(); - } - - public boolean isLastPacketInBlock() { - return proto.getLastPacketInBlock(); - } - - public long getSeqno() { - return proto.getSeqno(); - } - - public long getOffsetInBlock() { - return proto.getOffsetInBlock(); - } - - public int getPacketLen() { - return packetLen; - } - - public boolean getSyncBlock() { - return proto.getSyncBlock(); - } - - @Override - public String toString() { - return "PacketHeader with packetLen=" + packetLen + - " header data: " + - proto.toString(); - } - - public void setFieldsFromData( - int packetLen, byte[] headerData) throws InvalidProtocolBufferException { - this.packetLen = packetLen; - proto = PacketHeaderProto.parseFrom(headerData); - } - - public void readFields(ByteBuffer buf) throws IOException { - packetLen = buf.getInt(); - short protoLen = buf.getShort(); - byte[] data = new byte[protoLen]; - buf.get(data); - proto = PacketHeaderProto.parseFrom(data); - } - - public void readFields(DataInputStream in) throws IOException { - this.packetLen = in.readInt(); - short protoLen = in.readShort(); - byte[] data = new byte[protoLen]; - in.readFully(data); - proto = PacketHeaderProto.parseFrom(data); - } - - /** - * @return the number of bytes necessary to write out this header, - * including the length-prefixing of the payload and header - */ - public int getSerializedSize() { - return PKT_LENGTHS_LEN + proto.getSerializedSize(); - } - - /** - * Write the header into the buffer. - * This requires that PKT_HEADER_LEN bytes are available. - */ - public void putInBuffer(final ByteBuffer buf) { - assert proto.getSerializedSize() <= MAX_PROTO_SIZE - : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); - try { - buf.putInt(packetLen); - buf.putShort((short) proto.getSerializedSize()); - proto.writeTo(new ByteBufferOutputStream(buf)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public void write(DataOutputStream out) throws IOException { - assert proto.getSerializedSize() <= MAX_PROTO_SIZE - : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); - out.writeInt(packetLen); - out.writeShort(proto.getSerializedSize()); - proto.writeTo(out); - } - - public byte[] getBytes() { - ByteBuffer buf = ByteBuffer.allocate(getSerializedSize()); - putInBuffer(buf); - return buf.array(); - } - - /** - * Perform a sanity check on the packet, returning true if it is sane. - * @param lastSeqNo the previous sequence number received - we expect the current - * sequence number to be larger by 1. - */ - public boolean sanityCheck(long lastSeqNo) { - // We should only have a non-positive data length for the last packet - if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false; - // The last packet should not contain data - if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false; - // Seqnos should always increase by 1 with each packet received - if (proto.getSeqno() != lastSeqNo + 1) return false; - return true; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof PacketHeader)) return false; - PacketHeader other = (PacketHeader)o; - return this.proto.equals(other.proto); - } - - @Override - public int hashCode() { - return (int)proto.getSeqno(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java deleted file mode 100644 index 3045a13..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java +++ /dev/null @@ -1,310 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.Closeable; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.util.DirectBufferPool; -import org.apache.hadoop.io.IOUtils; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; - -/** - * Class to handle reading packets one-at-a-time from the wire. - * These packets are used both for reading and writing data to/from - * DataNodes. - */ [email protected] -public class PacketReceiver implements Closeable { - - /** - * The max size of any single packet. This prevents OOMEs when - * invalid data is sent. - */ - private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; - - static final Log LOG = LogFactory.getLog(PacketReceiver.class); - - private static final DirectBufferPool bufferPool = new DirectBufferPool(); - private final boolean useDirectBuffers; - - /** - * The entirety of the most recently read packet. - * The first PKT_LENGTHS_LEN bytes of this buffer are the - * length prefixes. - */ - private ByteBuffer curPacketBuf = null; - - /** - * A slice of {@link #curPacketBuf} which contains just the checksums. - */ - private ByteBuffer curChecksumSlice = null; - - /** - * A slice of {@link #curPacketBuf} which contains just the data. - */ - private ByteBuffer curDataSlice = null; - - /** - * The packet header of the most recently read packet. - */ - private PacketHeader curHeader; - - public PacketReceiver(boolean useDirectBuffers) { - this.useDirectBuffers = useDirectBuffers; - reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN); - } - - public PacketHeader getHeader() { - return curHeader; - } - - public ByteBuffer getDataSlice() { - return curDataSlice; - } - - public ByteBuffer getChecksumSlice() { - return curChecksumSlice; - } - - /** - * Reads all of the data for the next packet into the appropriate buffers. - * - * The data slice and checksum slice members will be set to point to the - * user data and corresponding checksums. The header will be parsed and - * set. - */ - public void receiveNextPacket(ReadableByteChannel in) throws IOException { - doRead(in, null); - } - - /** - * @see #receiveNextPacket(ReadableByteChannel) - */ - public void receiveNextPacket(InputStream in) throws IOException { - doRead(null, in); - } - - private void doRead(ReadableByteChannel ch, InputStream in) - throws IOException { - // Each packet looks like: - // PLEN HLEN HEADER CHECKSUMS DATA - // 32-bit 16-bit <protobuf> <variable length> - // - // PLEN: Payload length - // = length(PLEN) + length(CHECKSUMS) + length(DATA) - // This length includes its own encoded length in - // the sum for historical reasons. - // - // HLEN: Header length - // = length(HEADER) - // - // HEADER: the actual packet header fields, encoded in protobuf - // CHECKSUMS: the crcs for the data chunk. May be missing if - // checksums were not requested - // DATA the actual block data - Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); - - curPacketBuf.clear(); - curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN); - doReadFully(ch, in, curPacketBuf); - curPacketBuf.flip(); - int payloadLen = curPacketBuf.getInt(); - - if (payloadLen < Ints.BYTES) { - // The "payload length" includes its own length. Therefore it - // should never be less than 4 bytes - throw new IOException("Invalid payload length " + - payloadLen); - } - int dataPlusChecksumLen = payloadLen - Ints.BYTES; - int headerLen = curPacketBuf.getShort(); - if (headerLen < 0) { - throw new IOException("Invalid header length " + headerLen); - } - - if (LOG.isTraceEnabled()) { - LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen + - " headerLen = " + headerLen); - } - - // Sanity check the buffer size so we don't allocate too much memory - // and OOME. - int totalLen = payloadLen + headerLen; - if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) { - throw new IOException("Incorrect value for packet payload size: " + - payloadLen); - } - - // Make sure we have space for the whole packet, and - // read it. - reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN + - dataPlusChecksumLen + headerLen); - curPacketBuf.clear(); - curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); - curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN + - dataPlusChecksumLen + headerLen); - doReadFully(ch, in, curPacketBuf); - curPacketBuf.flip(); - curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); - - // Extract the header from the front of the buffer (after the length prefixes) - byte[] headerBuf = new byte[headerLen]; - curPacketBuf.get(headerBuf); - if (curHeader == null) { - curHeader = new PacketHeader(); - } - curHeader.setFieldsFromData(payloadLen, headerBuf); - - // Compute the sub-slices of the packet - int checksumLen = dataPlusChecksumLen - curHeader.getDataLen(); - if (checksumLen < 0) { - throw new IOException("Invalid packet: data length in packet header " + - "exceeds data length received. dataPlusChecksumLen=" + - dataPlusChecksumLen + " header: " + curHeader); - } - - reslicePacket(headerLen, checksumLen, curHeader.getDataLen()); - } - - /** - * Rewrite the last-read packet on the wire to the given output stream. - */ - public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException { - Preconditions.checkState(!useDirectBuffers, - "Currently only supported for non-direct buffers"); - mirrorOut.write(curPacketBuf.array(), - curPacketBuf.arrayOffset(), - curPacketBuf.remaining()); - } - - - private static void doReadFully(ReadableByteChannel ch, InputStream in, - ByteBuffer buf) throws IOException { - if (ch != null) { - readChannelFully(ch, buf); - } else { - Preconditions.checkState(!buf.isDirect(), - "Must not use direct buffers with InputStream API"); - IOUtils.readFully(in, buf.array(), - buf.arrayOffset() + buf.position(), - buf.remaining()); - buf.position(buf.position() + buf.remaining()); - } - } - - private void reslicePacket( - int headerLen, int checksumsLen, int dataLen) { - // Packet structure (refer to doRead() for details): - // PLEN HLEN HEADER CHECKSUMS DATA - // 32-bit 16-bit <protobuf> <variable length> - // |--- lenThroughHeader ----| - // |----------- lenThroughChecksums ----| - // |------------------- lenThroughData ------| - int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen; - int lenThroughChecksums = lenThroughHeader + checksumsLen; - int lenThroughData = lenThroughChecksums + dataLen; - - assert dataLen >= 0 : "invalid datalen: " + dataLen; - assert curPacketBuf.position() == lenThroughHeader; - assert curPacketBuf.limit() == lenThroughData : - "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen + - " rem=" + curPacketBuf.remaining(); - - // Slice the checksums. - curPacketBuf.position(lenThroughHeader); - curPacketBuf.limit(lenThroughChecksums); - curChecksumSlice = curPacketBuf.slice(); - - // Slice the data. - curPacketBuf.position(lenThroughChecksums); - curPacketBuf.limit(lenThroughData); - curDataSlice = curPacketBuf.slice(); - - // Reset buffer to point to the entirety of the packet (including - // length prefixes) - curPacketBuf.position(0); - curPacketBuf.limit(lenThroughData); - } - - - private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf) - throws IOException { - while (buf.remaining() > 0) { - int n = ch.read(buf); - if (n < 0) { - throw new IOException("Premature EOF reading from " + ch); - } - } - } - - private void reallocPacketBuf(int atLeastCapacity) { - // Realloc the buffer if this packet is longer than the previous - // one. - if (curPacketBuf == null || - curPacketBuf.capacity() < atLeastCapacity) { - ByteBuffer newBuf; - if (useDirectBuffers) { - newBuf = bufferPool.getBuffer(atLeastCapacity); - } else { - newBuf = ByteBuffer.allocate(atLeastCapacity); - } - // If reallocing an existing buffer, copy the old packet length - // prefixes over - if (curPacketBuf != null) { - curPacketBuf.flip(); - newBuf.put(curPacketBuf); - } - - returnPacketBufToPool(); - curPacketBuf = newBuf; - } - } - - private void returnPacketBufToPool() { - if (curPacketBuf != null && curPacketBuf.isDirect()) { - bufferPool.returnBuffer(curPacketBuf); - curPacketBuf = null; - } - } - - @Override // Closeable - public void close() { - returnPacketBufToPool(); - } - - @Override - protected void finalize() throws Throwable { - try { - // just in case it didn't get closed, we - // may as well still try to return the buffer - returnPacketBufToPool(); - } finally { - super.finalize(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java deleted file mode 100644 index 31d4dcc..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.util; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * OutputStream that writes into a {@link ByteBuffer}. - */ [email protected] [email protected] -public class ByteBufferOutputStream extends OutputStream { - - private final ByteBuffer buf; - - public ByteBufferOutputStream(ByteBuffer buf) { - this.buf = buf; - } - - @Override - public void write(int b) throws IOException { - buf.put((byte)b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - buf.put(b, off, len); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/826ae1c2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java index 8dd3d6f..5ff343a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java @@ -24,10 +24,10 @@ import static org.mockito.Mockito.verify; import java.util.List; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -41,7 +41,7 @@ public class TestClientBlockVerification { static LocatedBlock testBlock = null; static { - ((Log4JLogger)RemoteBlockReader2.LOG).getLogger().setLevel(Level.ALL); + GenericTestUtils.setLogLevel(RemoteBlockReader2.LOG, Level.ALL); } @BeforeClass public static void setupCluster() throws Exception {
