http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTFile.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/DTFile.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTFile.java new file mode 100644 index 0000000..f1c87ba --- /dev/null +++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTFile.java @@ -0,0 +1,2399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Comparator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.BoundedByteArrayOutputStream; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.file.tfile.ByteArray; +import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists; +import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist; +import org.apache.hadoop.io.file.tfile.RawComparable; +import org.apache.hadoop.io.file.tfile.Utils; +import org.apache.hadoop.io.file.tfile.DTBCFile.Reader.BlockReader; +import org.apache.hadoop.io.file.tfile.DTBCFile.Writer.BlockAppender; +import org.apache.hadoop.io.file.tfile.Utils.Version; +import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder; +import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder; +import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator; +import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator; +import org.apache.hadoop.io.serializer.JavaSerializationComparator; + +/** + * <ul> + * <li>The file format of DTFile is same as {@link TFile} with different reader implementation. + * It reads data block by block and cache the binary block data into memory to speed up the random read. + * + * <li>The public api of {@link Reader} is as same as it is in {@link TFile} {@link org.apache.hadoop.io.file.tfile.TFile.Reader} implementation. + * Besides, it provides getBlockBuffer(), getKeyOffset(), getKeyLength(), getValueOffset(), getValueLength() method + * to expose raw block, key, value data to user to avoid unnecessary internal/external data copy + * + * <li>In the performance test, It shows no difference in sequential reads and 20x faster in random reads(If most of them hit memory) + * </ul> + * + * A TFile is a container of key-value pairs. Both keys and values are type-less + * bytes. Keys are restricted to 64KB, value length is not restricted + * (practically limited to the available disk storage). TFile further provides + * the following features: + * <ul> + * <li>Block Compression. + * <li>Named meta data blocks. + * <li>Sorted or unsorted keys. + * <li>Seek by key or by file offset. + * </ul> + * The memory footprint of a TFile includes the following: + * <ul> + * <li>Some constant overhead of reading or writing a compressed block. + * <ul> + * <li>Each compressed block requires one compression/decompression codec for + * I/O. + * <li>Temporary space to buffer the key. + * <li>Temporary space to buffer the value (for TFile.Writer only). Values are + * chunk encoded, so that we buffer at most one chunk of user data. By default, + * the chunk buffer is 1MB. Reading chunked value does not require additional + * memory. + * </ul> + * <li>TFile index, which is proportional to the total number of Data Blocks. + * The total amount of memory needed to hold the index can be estimated as + * (56+AvgKeySize)*NumBlocks. + * <li>MetaBlock index, which is proportional to the total number of Meta + * Blocks.The total amount of memory needed to hold the index for Meta Blocks + * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock. + * </ul> + * <p> + * The behavior of TFile can be customized by the following variables through + * Configuration: + * <ul> + * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default + * to 1MB. Values of the length less than the chunk size is guaranteed to have + * known value length in read time (See + * {@link DTFile.Reader.Scanner.Entry#isValueLengthKnown()}). + * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for + * FSDataOutputStream. Integer (in bytes). Default to 256KB. + * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for + * FSDataInputStream. Integer (in bytes). Default to 256KB. + * </ul> + * <p> + * Suggestions on performance optimization. + * <ul> + * <li>Minimum block size. We recommend a setting of minimum block size between + * 256KB to 1MB for general usage. Larger block size is preferred if files are + * primarily for sequential access. However, it would lead to inefficient random + * access (because there are more data to decompress). Smaller blocks are good + * for random access, but require more memory to hold the block index, and may + * be slower to create (because we must flush the compressor stream at the + * conclusion of each data block, which leads to an FS I/O flush). Further, due + * to the internal caching in Compression codec, the smallest possible block + * size would be around 20KB-30KB. + * <li>The current implementation does not offer true multi-threading for + * reading. The implementation uses FSDataInputStream seek()+read(), which is + * shown to be much faster than positioned-read call in single thread mode. + * However, it also means that if multiple threads attempt to access the same + * TFile (using multiple scanners) simultaneously, the actual I/O is carried out + * sequentially even if they access different DFS blocks. + * <li>Compression codec. Use "none" if the data is not very compressable (by + * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo" + * as the starting point for experimenting. "gz" overs slightly better + * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to + * decompress, comparing to "lzo". + * <li>File system buffering, if the underlying FSDataInputStream and + * FSDataOutputStream is already adequately buffered; or if applications + * reads/writes keys and values in large buffers, we can reduce the sizes of + * input/output buffering in TFile layer by setting the configuration parameters + * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size". + * </ul> + * + * Some design rationale behind TFile can be found at <a + * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>. + * + * @since 2.0.0 + */ [email protected] [email protected] +public class DTFile { + static final Log LOG = LogFactory.getLog(DTFile.class); + + private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size"; + private static final String FS_INPUT_BUF_SIZE_ATTR = + "tfile.fs.input.buffer.size"; + private static final String FS_OUTPUT_BUF_SIZE_ATTR = + "tfile.fs.output.buffer.size"; + + public static final int DEFAULT_INPUT_FS_BUF_SIZE = 256 * 1024; + + static int getChunkBufferSize(Configuration conf) { + int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024); + return (ret > 0) ? ret : 1024 * 1024; + } + + static int getFSInputBufferSize(Configuration conf) { + int buffserSize = conf.getInt(FS_INPUT_BUF_SIZE_ATTR, DEFAULT_INPUT_FS_BUF_SIZE); + if (buffserSize <= 0) + buffserSize = DEFAULT_INPUT_FS_BUF_SIZE; + return buffserSize; + } + + static int getFSOutputBufferSize(Configuration conf) { + return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024); + } + + private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB + static final Version API_VERSION = new Version((short) 1, (short) 0); + + /** compression: gzip */ + public static final String COMPRESSION_GZ = "gz"; + /** compression: lzo */ + public static final String COMPRESSION_LZO = "lzo"; + /** compression: none */ + public static final String COMPRESSION_NONE = "none"; + /** comparator: memcmp */ + public static final String COMPARATOR_MEMCMP = "memcmp"; + /** comparator prefix: java class */ + public static final String COMPARATOR_JCLASS = "jclass:"; + + /** + * Make a raw comparator from a string name. + * + * @param name + * Comparator name + * @return A RawComparable comparator. + */ + static public Comparator<RawComparable> makeComparator(String name) { + return TFileMeta.makeComparator(name); + } + + // Prevent the instantiation of TFiles + private DTFile() { + // nothing + } + + /** + * Get names of supported compression algorithms. The names are acceptable by + * TFile.Writer. + * + * @return Array of strings, each represents a supported compression + * algorithm. Currently, the following compression algorithms are + * supported. + * <ul> + * <li>"none" - No compression. + * <li>"lzo" - LZO compression. + * <li>"gz" - GZIP compression. + * </ul> + */ + public static String[] getSupportedCompressionAlgorithms() { + return Compression.getSupportedAlgorithms(); + } + + /** + * TFile Writer. + */ + @InterfaceStability.Evolving + public static class Writer implements Closeable { + // minimum compressed size for a block. + private final int sizeMinBlock; + + // Meta blocks. + final TFileIndex tfileIndex; + final TFileMeta tfileMeta; + + // reference to the underlying BCFile. + private DTBCFile.Writer writerBCF; + + // current data block appender. + BlockAppender blkAppender; + long blkRecordCount; + + // buffers for caching the key. + BoundedByteArrayOutputStream currentKeyBufferOS; + BoundedByteArrayOutputStream lastKeyBufferOS; + + // buffer used by chunk codec + private byte[] valueBuffer; + + /** + * Writer states. The state always transits in circles: READY -> IN_KEY -> + * END_KEY -> IN_VALUE -> READY. + */ + private enum State { + READY, // Ready to start a new key-value pair insertion. + IN_KEY, // In the middle of key insertion. + END_KEY, // Key insertion complete, ready to insert value. + IN_VALUE, // In value insertion. + // ERROR, // Error encountered, cannot continue. + CLOSED, // TFile already closed. + }; + + // current state of Writer. + State state = State.READY; + Configuration conf; + long errorCount = 0; + + /** + * Constructor + * + * @param fsdos + * output stream for writing. Must be at position 0. + * @param minBlockSize + * Minimum compressed block size in bytes. A compression block will + * not be closed until it reaches this size except for the last + * block. + * @param compressName + * Name of the compression algorithm. Must be one of the strings + * returned by {@link DTFile#getSupportedCompressionAlgorithms()}. + * @param comparator + * Leave comparator as null or empty string if TFile is not sorted. + * Otherwise, provide the string name for the comparison algorithm + * for keys. Two kinds of comparators are supported. + * <ul> + * <li>Algorithmic comparator: binary comparators that is language + * independent. Currently, only "memcmp" is supported. + * <li>Language-specific comparator: binary comparators that can + * only be constructed in specific language. For Java, the syntax + * is "jclass:", followed by the class name of the RawComparator. + * Currently, we only support RawComparators that can be + * constructed through the default constructor (with no + * parameters). Parameterized RawComparators such as + * {@link WritableComparator} or + * {@link JavaSerializationComparator} may not be directly used. + * One should write a wrapper class that inherits from such classes + * and use its default constructor to perform proper + * initialization. + * </ul> + * @param conf + * The configuration object. + * @throws IOException + */ + public Writer(FSDataOutputStream fsdos, int minBlockSize, + String compressName, String comparator, Configuration conf) + throws IOException { + sizeMinBlock = minBlockSize; + tfileMeta = new TFileMeta(comparator); + tfileIndex = new TFileIndex(tfileMeta.getComparator()); + + writerBCF = new DTBCFile.Writer(fsdos, compressName, conf); + currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE); + lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE); + this.conf = conf; + } + + /** + * Close the Writer. Resources will be released regardless of the exceptions + * being thrown. Future close calls will have no effect. + * + * The underlying FSDataOutputStream is not closed. + */ + @Override + public void close() throws IOException { + if ((state == State.CLOSED)) { + return; + } + try { + // First try the normal finish. + // Terminate upon the first Exception. + if (errorCount == 0) { + if (state != State.READY) { + throw new IllegalStateException( + "Cannot close TFile in the middle of key-value insertion."); + } + + finishDataBlock(true); + + // first, write out data:TFile.meta + BlockAppender outMeta = + writerBCF + .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE); + try { + tfileMeta.write(outMeta); + } finally { + outMeta.close(); + } + + // second, write out data:TFile.index + BlockAppender outIndex = + writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME); + try { + tfileIndex.write(outIndex); + } finally { + outIndex.close(); + } + + writerBCF.close(); + } + } finally { + IOUtils.cleanup(LOG, blkAppender, writerBCF); + blkAppender = null; + writerBCF = null; + state = State.CLOSED; + } + } + + /** + * Adding a new key-value pair to the TFile. This is synonymous to + * append(key, 0, key.length, value, 0, value.length) + * + * @param key + * Buffer for key. + * @param value + * Buffer for value. + * @throws IOException + */ + public void append(byte[] key, byte[] value) throws IOException { + append(key, 0, key.length, value, 0, value.length); + } + + /** + * Adding a new key-value pair to TFile. + * + * @param key + * buffer for key. + * @param koff + * offset in key buffer. + * @param klen + * length of key. + * @param value + * buffer for value. + * @param voff + * offset in value buffer. + * @param vlen + * length of value. + * @throws IOException + * Upon IO errors. + * <p> + * If an exception is thrown, the TFile will be in an inconsistent + * state. The only legitimate call after that would be close + */ + public void append(byte[] key, int koff, int klen, byte[] value, int voff, + int vlen) throws IOException { + if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) { + throw new IndexOutOfBoundsException( + "Bad key buffer offset-length combination."); + } + + if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) { + throw new IndexOutOfBoundsException( + "Bad value buffer offset-length combination."); + } + + try { + DataOutputStream dosKey = prepareAppendKey(klen); + try { + ++errorCount; + dosKey.write(key, koff, klen); + --errorCount; + } finally { + dosKey.close(); + } + + DataOutputStream dosValue = prepareAppendValue(vlen); + try { + ++errorCount; + dosValue.write(value, voff, vlen); + --errorCount; + } finally { + dosValue.close(); + } + } finally { + state = State.READY; + } + } + + /** + * Helper class to register key after close call on key append stream. + */ + private class KeyRegister extends DataOutputStream { + private final int expectedLength; + private boolean closed = false; + + public KeyRegister(int len) { + super(currentKeyBufferOS); + if (len >= 0) { + currentKeyBufferOS.reset(len); + } else { + currentKeyBufferOS.reset(); + } + expectedLength = len; + } + + @Override + public void close() throws IOException { + if (closed == true) { + return; + } + + try { + ++errorCount; + byte[] key = currentKeyBufferOS.getBuffer(); + int len = currentKeyBufferOS.size(); + /** + * verify length. + */ + if (expectedLength >= 0 && expectedLength != len) { + throw new IOException("Incorrect key length: expected=" + + expectedLength + " actual=" + len); + } + + Utils.writeVInt(blkAppender, len); + blkAppender.write(key, 0, len); + if (tfileIndex.getFirstKey() == null) { + tfileIndex.setFirstKey(key, 0, len); + } + + if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) { + byte[] lastKey = lastKeyBufferOS.getBuffer(); + int lastLen = lastKeyBufferOS.size(); + if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0, + lastLen) < 0) { + throw new IOException("Keys are not added in sorted order"); + } + } + + BoundedByteArrayOutputStream tmp = currentKeyBufferOS; + currentKeyBufferOS = lastKeyBufferOS; + lastKeyBufferOS = tmp; + --errorCount; + } finally { + closed = true; + state = State.END_KEY; + } + } + } + + /** + * Helper class to register value after close call on value append stream. + */ + private class ValueRegister extends DataOutputStream { + private boolean closed = false; + + public ValueRegister(OutputStream os) { + super(os); + } + + // Avoiding flushing call to down stream. + @Override + public void flush() { + // do nothing + } + + @Override + public void close() throws IOException { + if (closed == true) { + return; + } + + try { + ++errorCount; + super.close(); + blkRecordCount++; + // bump up the total record count in the whole file + tfileMeta.incRecordCount(); + finishDataBlock(false); + --errorCount; + } finally { + closed = true; + state = State.READY; + } + } + } + + /** + * Obtain an output stream for writing a key into TFile. This may only be + * called when there is no active Key appending stream or value appending + * stream. + * + * @param length + * The expected length of the key. If length of the key is not + * known, set length = -1. Otherwise, the application must write + * exactly as many bytes as specified here before calling close on + * the returned output stream. + * @return The key appending output stream. + * @throws IOException + * + */ + public DataOutputStream prepareAppendKey(int length) throws IOException { + if (state != State.READY) { + throw new IllegalStateException("Incorrect state to start a new key: " + + state.name()); + } + + initDataBlock(); + DataOutputStream ret = new KeyRegister(length); + state = State.IN_KEY; + return ret; + } + + /** + * Obtain an output stream for writing a value into TFile. This may only be + * called right after a key appending operation (the key append stream must + * be closed). + * + * @param length + * The expected length of the value. If length of the value is not + * known, set length = -1. Otherwise, the application must write + * exactly as many bytes as specified here before calling close on + * the returned output stream. Advertising the value size up-front + * guarantees that the value is encoded in one chunk, and avoids + * intermediate chunk buffering. + * @throws IOException + * + */ + public DataOutputStream prepareAppendValue(int length) throws IOException { + if (state != State.END_KEY) { + throw new IllegalStateException( + "Incorrect state to start a new value: " + state.name()); + } + + DataOutputStream ret; + + // unknown length + if (length < 0) { + if (valueBuffer == null) { + valueBuffer = new byte[getChunkBufferSize(conf)]; + } + ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer)); + } else { + ret = + new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length)); + } + + state = State.IN_VALUE; + return ret; + } + + /** + * Obtain an output stream for creating a meta block. This function may not + * be called when there is a key append stream or value append stream + * active. No more key-value insertion is allowed after a meta data block + * has been added to TFile. + * + * @param name + * Name of the meta block. + * @param compressName + * Name of the compression algorithm to be used. Must be one of the + * strings returned by + * {@link DTFile#getSupportedCompressionAlgorithms()}. + * @return A DataOutputStream that can be used to write Meta Block data. + * Closing the stream would signal the ending of the block. + * @throws IOException + * @throws MetaBlockAlreadyExists + * the Meta Block with the same name already exists. + */ + public DataOutputStream prepareMetaBlock(String name, String compressName) + throws IOException, MetaBlockAlreadyExists { + if (state != State.READY) { + throw new IllegalStateException( + "Incorrect state to start a Meta Block: " + state.name()); + } + + finishDataBlock(true); + DataOutputStream outputStream = + writerBCF.prepareMetaBlock(name, compressName); + return outputStream; + } + + /** + * Obtain an output stream for creating a meta block. This function may not + * be called when there is a key append stream or value append stream + * active. No more key-value insertion is allowed after a meta data block + * has been added to TFile. Data will be compressed using the default + * compressor as defined in Writer's constructor. + * + * @param name + * Name of the meta block. + * @return A DataOutputStream that can be used to write Meta Block data. + * Closing the stream would signal the ending of the block. + * @throws IOException + * @throws MetaBlockAlreadyExists + * the Meta Block with the same name already exists. + */ + public DataOutputStream prepareMetaBlock(String name) throws IOException, + MetaBlockAlreadyExists { + if (state != State.READY) { + throw new IllegalStateException( + "Incorrect state to start a Meta Block: " + state.name()); + } + + finishDataBlock(true); + return writerBCF.prepareMetaBlock(name); + } + + /** + * Check if we need to start a new data block. + * + * @throws IOException + */ + private void initDataBlock() throws IOException { + // for each new block, get a new appender + if (blkAppender == null) { + blkAppender = writerBCF.prepareDataBlock(); + } + } + + /** + * Close the current data block if necessary. + * + * @param bForceFinish + * Force the closure regardless of the block size. + * @throws IOException + */ + void finishDataBlock(boolean bForceFinish) throws IOException { + if (blkAppender == null) { + return; + } + + // exceeded the size limit, do the compression and finish the block + if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) { + // keep tracks of the last key of each data block, no padding + // for now + TFileIndexEntry keyLast = + new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS + .size(), blkRecordCount); + tfileIndex.addEntry(keyLast); + // close the appender + blkAppender.close(); + blkAppender = null; + blkRecordCount = 0; + } + } + } + + /** + * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner. + * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()} + * ) , a portion of TFile based on byte offsets ( + * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys + * fall in a certain key range (for sorted TFile only, + * {@link Reader#createScannerByKey(byte[], byte[])} or + * {@link Reader#createScannerByKey(RawComparable, RawComparable)}). + */ + @InterfaceStability.Evolving + public static class Reader implements Closeable { + // The underlying BCFile reader. + final DTBCFile.Reader readerBCF; + + // TFile index, it is loaded lazily. + TFileIndex tfileIndex = null; + final TFileMeta tfileMeta; + final BytesComparator comparator; + + // global begin and end locations. + private final Location begin; + private final Location end; + + /** + * Location representing a virtual position in the TFile. + */ + static final class Location implements Comparable<Location>, Cloneable { + private int blockIndex; + // distance/offset from the beginning of the block + private long recordIndex; + + Location(int blockIndex, long recordIndex) { + set(blockIndex, recordIndex); + } + + void incRecordIndex() { + ++recordIndex; + } + + Location(Location other) { + set(other); + } + + int getBlockIndex() { + return blockIndex; + } + + long getRecordIndex() { + return recordIndex; + } + + void set(int blockIndex, long recordIndex) { + if ((blockIndex | recordIndex) < 0) { + throw new IllegalArgumentException( + "Illegal parameter for BlockLocation."); + } + this.blockIndex = blockIndex; + this.recordIndex = recordIndex; + } + + void set(Location other) { + set(other.blockIndex, other.recordIndex); + } + + /** + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + @Override + public int compareTo(Location other) { + return compareTo(other.blockIndex, other.recordIndex); + } + + int compareTo(int bid, long rid) { + if (this.blockIndex == bid) { + long ret = this.recordIndex - rid; + if (ret > 0) return 1; + if (ret < 0) return -1; + return 0; + } + return this.blockIndex - bid; + } + + /** + * @see java.lang.Object#clone() + */ + @Override + protected Location clone() throws CloneNotSupportedException { + return (Location)super.clone(); + } + + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = prime + blockIndex; + result = (int) (prime * result + recordIndex); + return result; + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + Location other = (Location) obj; + if (blockIndex != other.blockIndex) return false; + if (recordIndex != other.recordIndex) return false; + return true; + } + } + + /** + * Constructor + * + * @param fsdis + * FS input stream of the TFile. + * @param fileLength + * The length of TFile. This is required because we have no easy + * way of knowing the actual size of the input file through the + * File input stream. + * @param conf + * @throws IOException + */ + public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf) + throws IOException { + readerBCF = new DTBCFile.Reader(fsdis, fileLength, conf); + + // first, read TFile meta + BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME); + try { + tfileMeta = new TFileMeta(brMeta); + } finally { + brMeta.close(); + } + + comparator = tfileMeta.getComparator(); + // Set begin and end locations. + begin = new Location(0, 0); + end = new Location(readerBCF.getBlockCount(), 0); + } + + /** + * Close the reader. The state of the Reader object is undefined after + * close. Calling close() for multiple times has no effect. + */ + @Override + public void close() throws IOException { + readerBCF.close(); + } + + /** + * Get the begin location of the TFile. + * + * @return If TFile is not empty, the location of the first key-value pair. + * Otherwise, it returns end(). + */ + Location begin() { + return begin; + } + + /** + * Get the end location of the TFile. + * + * @return The location right after the last key-value pair in TFile. + */ + Location end() { + return end; + } + + /** + * Get the string representation of the comparator. + * + * @return If the TFile is not sorted by keys, an empty string will be + * returned. Otherwise, the actual comparator string that is + * provided during the TFile creation time will be returned. + */ + public String getComparatorName() { + return tfileMeta.getComparatorString(); + } + + /** + * Is the TFile sorted? + * + * @return true if TFile is sorted. + */ + public boolean isSorted() { + return tfileMeta.isSorted(); + } + + /** + * Get the number of key-value pair entries in TFile. + * + * @return the number of key-value pairs in TFile + */ + public long getEntryCount() { + return tfileMeta.getRecordCount(); + } + + /** + * Lazily loading the TFile index. + * + * @throws IOException + */ + synchronized void checkTFileDataIndex() throws IOException { + if (tfileIndex == null) { + BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME); + try { + tfileIndex = + new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta + .getComparator()); + } finally { + brIndex.close(); + } + } + } + + /** + * Get the first key in the TFile. + * + * @return The first key in the TFile. + * @throws IOException + */ + public RawComparable getFirstKey() throws IOException { + checkTFileDataIndex(); + return tfileIndex.getFirstKey(); + } + + /** + * Get the last key in the TFile. + * + * @return The last key in the TFile. + * @throws IOException + */ + public RawComparable getLastKey() throws IOException { + checkTFileDataIndex(); + return tfileIndex.getLastKey(); + } + + /** + * Get a Comparator object to compare Entries. It is useful when you want + * stores the entries in a collection (such as PriorityQueue) and perform + * sorting or comparison among entries based on the keys without copying out + * the key. + * + * @return An Entry Comparator.. + */ + public Comparator<Scanner.Entry> getEntryComparator() { + if (!isSorted()) { + throw new RuntimeException( + "Entries are not comparable for unsorted TFiles"); + } + + return new Comparator<Scanner.Entry>() { + /** + * Provide a customized comparator for Entries. This is useful if we + * have a collection of Entry objects. However, if the Entry objects + * come from different TFiles, users must ensure that those TFiles share + * the same RawComparator. + */ + @Override + public int compare(Scanner.Entry o1, Scanner.Entry o2) { + return comparator.compare(o1.getBlockBuffer(), o1.getKeyOffset(), o1.getKeyLength(), o2 + .getBlockBuffer(), o2.getKeyOffset(), o2.getKeyLength()); + } + }; + } + + /** + * Get an instance of the RawComparator that is constructed based on the + * string comparator representation. + * + * @return a Comparator that can compare RawComparable's. + */ + public Comparator<RawComparable> getComparator() { + return comparator; + } + + /** + * Stream access to a meta block.`` + * + * @param name + * The name of the meta block. + * @return The input stream. + * @throws IOException + * on I/O error. + * @throws MetaBlockDoesNotExist + * If the meta block with the name does not exist. + */ + public DataInputStream getMetaBlock(String name) throws IOException, + MetaBlockDoesNotExist { + return readerBCF.getMetaBlock(name); + } + + /** + * if greater is true then returns the beginning location of the block + * containing the key strictly greater than input key. if greater is false + * then returns the beginning location of the block greater than equal to + * the input key + * + * @param key + * the input key + * @param greater + * boolean flag + * @return + * @throws IOException + */ + Location getBlockContainsKey(RawComparable key, boolean greater) + throws IOException { + if (!isSorted()) { + throw new RuntimeException("Seeking in unsorted TFile"); + } + checkTFileDataIndex(); + int blkIndex = + (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key); + if (blkIndex < 0) return end; + return new Location(blkIndex, 0); + } + + Location getLocationByRecordNum(long recNum) throws IOException { + checkTFileDataIndex(); + return tfileIndex.getLocationByRecordNum(recNum); + } + + long getRecordNumByLocation(Location location) throws IOException { + checkTFileDataIndex(); + return tfileIndex.getRecordNumByLocation(location); + } + + int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) { + if (!isSorted()) { + throw new RuntimeException("Cannot compare keys for unsorted TFiles."); + } + return comparator.compare(a, o1, l1, b, o2, l2); + } + + int compareKeys(RawComparable a, RawComparable b) { + if (!isSorted()) { + throw new RuntimeException("Cannot compare keys for unsorted TFiles."); + } + return comparator.compare(a, b); + } + + /** + * Get the location pointing to the beginning of the first key-value pair in + * a compressed block whose byte offset in the TFile is greater than or + * equal to the specified offset. + * + * @param offset + * the user supplied offset. + * @return the location to the corresponding entry; or end() if no such + * entry exists. + */ + Location getLocationNear(long offset) { + int blockIndex = readerBCF.getBlockIndexNear(offset); + if (blockIndex == -1) return end; + return new Location(blockIndex, 0); + } + + /** + * Get the RecordNum for the first key-value pair in a compressed block + * whose byte offset in the TFile is greater than or equal to the specified + * offset. + * + * @param offset + * the user supplied offset. + * @return the RecordNum to the corresponding entry. If no such entry + * exists, it returns the total entry count. + * @throws IOException + */ + public long getRecordNumNear(long offset) throws IOException { + return getRecordNumByLocation(getLocationNear(offset)); + } + + /** + * Get a sample key that is within a block whose starting offset is greater + * than or equal to the specified offset. + * + * @param offset + * The file offset. + * @return the key that fits the requirement; or null if no such key exists + * (which could happen if the offset is close to the end of the + * TFile). + * @throws IOException + */ + public RawComparable getKeyNear(long offset) throws IOException { + int blockIndex = readerBCF.getBlockIndexNear(offset); + if (blockIndex == -1) return null; + checkTFileDataIndex(); + return new ByteArray(tfileIndex.getEntry(blockIndex).key); + } + + /** + * Get a scanner than can scan the whole TFile. + * + * @return The scanner object. A valid Scanner is always returned even if + * the TFile is empty. + * @throws IOException + */ + public Scanner createScanner() throws IOException { + return new Scanner(this, begin, end); + } + + /** + * Get a scanner that covers a portion of TFile based on byte offsets. + * + * @param offset + * The beginning byte offset in the TFile. + * @param length + * The length of the region. + * @return The actual coverage of the returned scanner tries to match the + * specified byte-region but always round up to the compression + * block boundaries. It is possible that the returned scanner + * contains zero key-value pairs even if length is positive. + * @throws IOException + */ + public Scanner createScannerByByteRange(long offset, long length) throws IOException { + return new Scanner(this, offset, offset + length); + } + + /** + * Get a scanner that covers a portion of TFile based on keys. + * + * @param beginKey + * Begin key of the scan (inclusive). If null, scan from the first + * key-value entry of the TFile. + * @param endKey + * End key of the scan (exclusive). If null, scan up to the last + * key-value entry of the TFile. + * @return The actual coverage of the returned scanner will cover all keys + * greater than or equal to the beginKey and less than the endKey. + * @throws IOException + * + * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead. + */ + @Deprecated + public Scanner createScanner(byte[] beginKey, byte[] endKey) + throws IOException { + return createScannerByKey(beginKey, endKey); + } + + /** + * Get a scanner that covers a portion of TFile based on keys. + * + * @param beginKey + * Begin key of the scan (inclusive). If null, scan from the first + * key-value entry of the TFile. + * @param endKey + * End key of the scan (exclusive). If null, scan up to the last + * key-value entry of the TFile. + * @return The actual coverage of the returned scanner will cover all keys + * greater than or equal to the beginKey and less than the endKey. + * @throws IOException + */ + public Scanner createScannerByKey(byte[] beginKey, byte[] endKey) + throws IOException { + return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey, + 0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey, + 0, endKey.length)); + } + + /** + * Get a scanner that covers a specific key range. + * + * @param beginKey + * Begin key of the scan (inclusive). If null, scan from the first + * key-value entry of the TFile. + * @param endKey + * End key of the scan (exclusive). If null, scan up to the last + * key-value entry of the TFile. + * @return The actual coverage of the returned scanner will cover all keys + * greater than or equal to the beginKey and less than the endKey. + * @throws IOException + * + * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)} + * instead. + */ + @Deprecated + public Scanner createScanner(RawComparable beginKey, RawComparable endKey) + throws IOException { + return createScannerByKey(beginKey, endKey); + } + + /** + * Get a scanner that covers a specific key range. + * + * @param beginKey + * Begin key of the scan (inclusive). If null, scan from the first + * key-value entry of the TFile. + * @param endKey + * End key of the scan (exclusive). If null, scan up to the last + * key-value entry of the TFile. + * @return The actual coverage of the returned scanner will cover all keys + * greater than or equal to the beginKey and less than the endKey. + * @throws IOException + */ + public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey) + throws IOException { + if ((beginKey != null) && (endKey != null) + && (compareKeys(beginKey, endKey) >= 0)) { + return new Scanner(this, beginKey, beginKey); + } + return new Scanner(this, beginKey, endKey); + } + + /** + * Create a scanner that covers a range of records. + * + * @param beginRecNum + * The RecordNum for the first record (inclusive). + * @param endRecNum + * The RecordNum for the last record (exclusive). To scan the whole + * file, either specify endRecNum==-1 or endRecNum==getEntryCount(). + * @return The TFile scanner that covers the specified range of records. + * @throws IOException + */ + public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum) + throws IOException { + if (beginRecNum < 0) beginRecNum = 0; + if (endRecNum < 0 || endRecNum > getEntryCount()) { + endRecNum = getEntryCount(); + } + return new Scanner(this, getLocationByRecordNum(beginRecNum), + getLocationByRecordNum(endRecNum)); + } + + /** + * The TFile Scanner. The Scanner has an implicit cursor, which, upon + * creation, points to the first key-value pair in the scan range. If the + * scan range is empty, the cursor will point to the end of the scan range. + * <p> + * Use {@link Scanner#atEnd()} to test whether the cursor is at the end + * location of the scanner. + * <p> + * Use {@link Scanner#advance()} to move the cursor to the next key-value + * pair (or end if none exists). Use seekTo methods ( + * {@link Scanner#seekTo(byte[])} or + * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary + * location in the covered range (including backward seeking). Use + * {@link Scanner#rewind()} to seek back to the beginning of the scanner. + * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner. + * <p> + * Actual keys and values may be obtained through {@link Scanner.Entry} + * object, which is obtained through {@link Scanner#entry()}. + */ + public static class Scanner implements Closeable { + // The underlying TFile reader. + final Reader reader; + // current block (null if reaching end) + private BlockReader blkReader; + private byte[] blockBuffer; + + Location beginLocation; + Location endLocation; + Location currentLocation; + + // flag to ensure value is only examined once. + boolean valueChecked = false; + // reusable buffer for keys. +// final byte[] keyBuffer; + // length of key, -1 means key is invalid. + int klen = -1; + int keyOffset = 0; + + static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024; + BytesWritable valTransferBuffer; + +// DataInputBuffer keyDataInputStream; + ChunkDecoder valueBufferInputStream; + DataInputStream valueDataInputStream; + // vlen == -1 if unknown. + int vlen; + int valueOffset = 0; + + /** + * Constructor + * + * @param reader + * The TFile reader object. + * @param offBegin + * Begin byte-offset of the scan. + * @param offEnd + * End byte-offset of the scan. + * @throws IOException + * + * The offsets will be rounded to the beginning of a compressed + * block whose offset is greater than or equal to the specified + * offset. + */ + protected Scanner(Reader reader, long offBegin, long offEnd) + throws IOException { + this(reader, reader.getLocationNear(offBegin), reader + .getLocationNear(offEnd)); + } + + /** + * Constructor + * + * @param reader + * The TFile reader object. + * @param begin + * Begin location of the scan. + * @param end + * End location of the scan. + * @throws IOException + */ + Scanner(Reader reader, Location begin, Location end) throws IOException { + this.reader = reader; + // ensure the TFile index is loaded throughout the life of scanner. + reader.checkTFileDataIndex(); + beginLocation = begin; + endLocation = end; + + valTransferBuffer = new BytesWritable(); + // TODO: remember the longest key in a TFile, and use it to replace + // MAX_KEY_SIZE. +// keyBuffer = new byte[MAX_KEY_SIZE]; +// keyDataInputStream = new DataInputBuffer(); + valueBufferInputStream = new ChunkDecoder(); + valueDataInputStream = new DataInputStream(valueBufferInputStream); + + if (beginLocation.compareTo(endLocation) >= 0) { + currentLocation = new Location(endLocation); + } else { + currentLocation = new Location(0, 0); + initBlock(beginLocation.getBlockIndex()); + inBlockAdvance(beginLocation.getRecordIndex()); + } + } + + /** + * Constructor + * + * @param reader + * The TFile reader object. + * @param beginKey + * Begin key of the scan. If null, scan from the first <K,V> + * entry of the TFile. + * @param endKey + * End key of the scan. If null, scan up to the last <K, V> entry + * of the TFile. + * @throws IOException + */ + protected Scanner(Reader reader, RawComparable beginKey, + RawComparable endKey) throws IOException { + this(reader, (beginKey == null) ? reader.begin() : reader + .getBlockContainsKey(beginKey, false), reader.end()); + if (beginKey != null) { + inBlockAdvance(beginKey, false); + beginLocation.set(currentLocation); + } + if (endKey != null) { + seekTo(endKey, false); + endLocation.set(currentLocation); + seekTo(beginLocation); + } + } + + /** + * Move the cursor to the first entry whose key is greater than or equal + * to the input key. Synonymous to seekTo(key, 0, key.length). The entry + * returned by the previous entry() call will be invalid. + * + * @param key + * The input key + * @return true if we find an equal key. + * @throws IOException + */ + public boolean seekTo(byte[] key) throws IOException { + return seekTo(key, 0, key.length); + } + + /** + * Move the cursor to the first entry whose key is greater than or equal + * to the input key. The entry returned by the previous entry() call will + * be invalid. + * + * @param key + * The input key + * @param keyOffset + * offset in the key buffer. + * @param keyLen + * key buffer length. + * @return true if we find an equal key; false otherwise. + * @throws IOException + */ + public boolean seekTo(byte[] key, int keyOffset, int keyLen) + throws IOException { + return seekTo(new ByteArray(key, keyOffset, keyLen), false); + } + + private boolean seekTo(RawComparable key, boolean beyond) + throws IOException { + Location l = reader.getBlockContainsKey(key, beyond); + if (l.compareTo(beginLocation) < 0) { + l = beginLocation; + } else if (l.compareTo(endLocation) >= 0) { + seekTo(endLocation); + return false; + } + + // check if what we are seeking is in the later part of the current + // block. + if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex()) + || (compareCursorKeyTo(key) >= 0)) { + // sorry, we must seek to a different location first. + seekTo(l); + } + + return inBlockAdvance(key, beyond); + } + + /** + * Move the cursor to the new location. The entry returned by the previous + * entry() call will be invalid. + * + * @param l + * new cursor location. It must fall between the begin and end + * location of the scanner. + * @throws IOException + */ + private void seekTo(Location l) throws IOException { + if (l.compareTo(beginLocation) < 0) { + throw new IllegalArgumentException( + "Attempt to seek before the begin location."); + } + + if (l.compareTo(endLocation) > 0) { + throw new IllegalArgumentException( + "Attempt to seek after the end location."); + } + + if (l.compareTo(endLocation) == 0) { + parkCursorAtEnd(); + return; + } + + if (l.getBlockIndex() != currentLocation.getBlockIndex()) { + // going to a totally different block + initBlock(l.getBlockIndex()); + } else { + if (valueChecked) { + // may temporarily go beyond the last record in the block (in which + // case the next if loop will always be true). + inBlockAdvance(1); + } + if (l.getRecordIndex() < currentLocation.getRecordIndex()) { + initBlock(l.getBlockIndex()); + } + } + + inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex()); + + return; + } + + /** + * Rewind to the first entry in the scanner. The entry returned by the + * previous entry() call will be invalid. + * + * @throws IOException + */ + public void rewind() throws IOException { + seekTo(beginLocation); + } + + /** + * Seek to the end of the scanner. The entry returned by the previous + * entry() call will be invalid. + * + * @throws IOException + */ + public void seekToEnd() throws IOException { + parkCursorAtEnd(); + } + + /** + * Move the cursor to the first entry whose key is greater than or equal + * to the input key. Synonymous to lowerBound(key, 0, key.length). The + * entry returned by the previous entry() call will be invalid. + * + * @param key + * The input key + * @throws IOException + */ + public void lowerBound(byte[] key) throws IOException { + lowerBound(key, 0, key.length); + } + + /** + * Move the cursor to the first entry whose key is greater than or equal + * to the input key. The entry returned by the previous entry() call will + * be invalid. + * + * @param key + * The input key + * @param keyOffset + * offset in the key buffer. + * @param keyLen + * key buffer length. + * @throws IOException + */ + public void lowerBound(byte[] key, int keyOffset, int keyLen) + throws IOException { + seekTo(new ByteArray(key, keyOffset, keyLen), false); + } + + /** + * Move the cursor to the first entry whose key is strictly greater than + * the input key. Synonymous to upperBound(key, 0, key.length). The entry + * returned by the previous entry() call will be invalid. + * + * @param key + * The input key + * @throws IOException + */ + public void upperBound(byte[] key) throws IOException { + upperBound(key, 0, key.length); + } + + /** + * Move the cursor to the first entry whose key is strictly greater than + * the input key. The entry returned by the previous entry() call will be + * invalid. + * + * @param key + * The input key + * @param keyOffset + * offset in the key buffer. + * @param keyLen + * key buffer length. + * @throws IOException + */ + public void upperBound(byte[] key, int keyOffset, int keyLen) + throws IOException { + seekTo(new ByteArray(key, keyOffset, keyLen), true); + } + + /** + * Move the cursor to the next key-value pair. The entry returned by the + * previous entry() call will be invalid. + * + * @return true if the cursor successfully moves. False when cursor is + * already at the end location and cannot be advanced. + * @throws IOException + */ + public boolean advance() throws IOException { + if (atEnd()) { + return false; + } + + int curBid = currentLocation.getBlockIndex(); + long curRid = currentLocation.getRecordIndex(); + long entriesInBlock = reader.getBlockEntryCount(curBid); + if (curRid + 1 >= entriesInBlock) { + if (endLocation.compareTo(curBid + 1, 0) <= 0) { + // last entry in TFile. + parkCursorAtEnd(); + } else { + // last entry in Block. + initBlock(curBid + 1); + } + } else { + inBlockAdvance(1); + } + return true; + } + + /** + * Load a compressed block for reading. Expecting blockIndex is valid. + * + * @throws IOException + */ + private void initBlock(int blockIndex) throws IOException { + klen = -1; + if (blkReader != null) { + try { + blkReader.close(); + } finally { + blkReader = null; + } + } + blkReader = reader.getBlockReader(blockIndex); + blockBuffer = blkReader.getBlockDataInputStream().getBuf(); + currentLocation.set(blockIndex, 0); + } + + private void parkCursorAtEnd() throws IOException { + klen = -1; + currentLocation.set(endLocation); + if (blkReader != null) { + try { + blkReader.close(); + } finally { + blkReader = null; + } + } + } + + /** + * Close the scanner. Release all resources. The behavior of using the + * scanner after calling close is not defined. The entry returned by the + * previous entry() call will be invalid. + */ + @Override + public void close() throws IOException { + parkCursorAtEnd(); + } + + /** + * Is cursor at the end location? + * + * @return true if the cursor is at the end location. + */ + public boolean atEnd() { + return (currentLocation.compareTo(endLocation) >= 0); + } + + /** + * check whether we have already successfully obtained the key. It also + * initializes the valueInputStream. + */ + void checkKey() throws IOException { + if (klen >= 0) return; + if (atEnd()) { + throw new EOFException("No key-value to read"); + } + klen = -1; + vlen = -1; + valueChecked = false; + + klen = Utils.readVInt(blkReader); + keyOffset = blkReader.getBlockDataInputStream().getPos(); + blkReader.getBlockDataInputStream().skip(klen); + valueBufferInputStream.reset(blkReader); + if (valueBufferInputStream.isLastChunk()) { + vlen = valueBufferInputStream.getRemain(); + valueOffset = blkReader.getBlockDataInputStream().getPos(); + } + } + + /** + * Get an entry to access the key and value. + * + * @return The Entry object to access the key and value. + * @throws IOException + */ + public Entry entry() throws IOException { + checkKey(); + return new Entry(); + } + + /** + * Get the RecordNum corresponding to the entry pointed by the cursor. + * @return The RecordNum corresponding to the entry pointed by the cursor. + * @throws IOException + */ + public long getRecordNum() throws IOException { + return reader.getRecordNumByLocation(currentLocation); + } + + /** + * Internal API. Comparing the key at cursor to user-specified key. + * + * @param other + * user-specified key. + * @return negative if key at cursor is smaller than user key; 0 if equal; + * and positive if key at cursor greater than user key. + * @throws IOException + */ + int compareCursorKeyTo(RawComparable other) throws IOException { + checkKey(); + return reader.compareKeys(blockBuffer, keyOffset, klen, other.buffer(), other + .offset(), other.size()); + } + + /** + * Entry to a <Key, Value> pair. + */ + public class Entry implements Comparable<RawComparable> { + /** + * Get the length of the key. + * + * @return the length of the key. + */ + public int getKeyLength() { + return klen; + } + + public int getKeyOffset() { + return keyOffset; + } + + public int getValueOffset() { + return valueOffset; + } + + public byte[] getBlockBuffer() { + return blockBuffer; + } + + /** + * Copy the key and value in one shot into BytesWritables. This is + * equivalent to getKey(key); getValue(value); + * + * @param key + * BytesWritable to hold key. + * @param value + * BytesWritable to hold value + * @throws IOException + */ + public void get(BytesWritable key, BytesWritable value) + throws IOException { + getKey(key); + getValue(value); + } + + /** + * Copy the key into BytesWritable. The input BytesWritable will be + * automatically resized to the actual key size. + * + * @param key + * BytesWritable to hold the key. + * @throws IOException + */ + public int getKey(BytesWritable key) throws IOException { + key.setSize(getKeyLength()); + getKey(key.getBytes()); + return key.getLength(); + } + + /** + * Copy the value into BytesWritable. The input BytesWritable will be + * automatically resized to the actual value size. The implementation + * directly uses the buffer inside BytesWritable for storing the value. + * The call does not require the value length to be known. + * + * @param value + * @throws IOException + */ + public long getValue(BytesWritable value) throws IOException { + DataInputStream dis = getValueStream(); + int size = 0; + try { + int remain; + while ((remain = valueBufferInputStream.getRemain()) > 0) { + value.setSize(size + remain); + dis.readFully(value.getBytes(), size, remain); + size += remain; + } + return value.getLength(); + } finally { + dis.close(); + } + } + + /** + * Writing the key to the output stream. This method avoids copying key + * buffer from Scanner into user buffer, then writing to the output + * stream. + * + * @param out + * The output stream + * @return the length of the key. + * @throws IOException + */ + public int writeKey(OutputStream out) throws IOException { + out.write(blockBuffer, keyOffset, klen); + return klen; + } + + /** + * Writing the value to the output stream. This method avoids copying + * value data from Scanner into user buffer, then writing to the output + * stream. It does not require the value length to be known. + * + * @param out + * The output stream + * @return the length of the value + * @throws IOException + */ + public long writeValue(OutputStream out) throws IOException { + DataInputStream dis = getValueStream(); + long size = 0; + try { + int chunkSize; + while ((chunkSize = valueBufferInputStream.getRemain()) > 0) { + chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE); + valTransferBuffer.setSize(chunkSize); + dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize); + out.write(valTransferBuffer.getBytes(), 0, chunkSize); + size += chunkSize; + } + return size; + } finally { + dis.close(); + } + } + + /** + * Copy the key into user supplied buffer. + * + * @param buf + * The buffer supplied by user. The length of the buffer must + * not be shorter than the key length. + * @return The length of the key. + * + * @throws IOException + */ + public int getKey(byte[] buf) throws IOException { + return getKey(buf, 0); + } + + /** + * Copy the key into user supplied buffer. + * + * @param buf + * The buffer supplied by user. + * @param offset + * The starting offset of the user buffer where we should copy + * the key into. Requiring the key-length + offset no greater + * than the buffer length. + * @return The length of the key. + * @throws IOException + */ + public int getKey(byte[] buf, int offset) throws IOException { + if ((offset | (buf.length - offset - klen)) < 0) { + throw new IndexOutOfBoundsException( + "Bufer not enough to store the key"); + } + System.arraycopy(blockBuffer, keyOffset, buf, offset, klen); + return klen; + } + + /** + * Streaming access to the key. Useful for desrializing the key into + * user objects. + * + * @return The input stream. + */ +// public DataInputStream getKeyStream() { +// keyDataInputStream.reset(keyBuffer, klen); +// return keyDataInputStream; +// } + + /** + * Get the length of the value. isValueLengthKnown() must be tested + * true. + * + * @return the length of the value. + */ + public int getValueLength() { + if (vlen >= 0) { + return vlen; + } + + throw new RuntimeException("Value length unknown."); + } + + /** + * Copy value into user-supplied buffer. User supplied buffer must be + * large enough to hold the whole value. The value part of the key-value + * pair pointed by the current cursor is not cached and can only be + * examined once. Calling any of the following functions more than once + * without moving the cursor will result in exception: + * {@link #getValue(byte[])}, {@link #getValue(byte[], int)}, + * {@link #getValueStream}. + * + * @return the length of the value. Does not require + * isValueLengthKnown() to be true. + * @throws IOException + * + */ + public int getValue(byte[] buf) throws IOException { + return getValue(buf, 0); + } + + /** + * Copy value into user-supplied buffer. User supplied buffer must be + * large enough to hold the whole value (starting from the offset). The + * value part of the key-value pair pointed by the current cursor is not + * cached and can only be examined once. Calling any of the following + * functions more than once without moving the cursor will result in + * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)}, + * {@link #getValueStream}. + * + * @return the length of the value. Does not require + * isValueLengthKnown() to be true. + * @throws IOException + */ + public int getValue(byte[] buf, int offset) throws IOException { + DataInputStream dis = getValueStream(); + try { + if (isValueLengthKnown()) { + if ((offset | (buf.length - offset - vlen)) < 0) { + throw new IndexOutOfBoundsException( + "Buffer too small to hold value"); + } + dis.readFully(buf, offset, vlen); + return vlen; + } + + int nextOffset = offset; + while (nextOffset < buf.length) { + int n = dis.read(buf, nextOffset, buf.length - nextOffset); + if (n < 0) { + break; + } + nextOffset += n; + } + if (dis.read() >= 0) { + // attempt to read one more byte to determine whether we reached + // the + // end or not. + throw new IndexOutOfBoundsException( + "Buffer too small to hold value"); + } + return nextOffset - offset; + } finally { + dis.close(); + } + } + + /** + * Stream access to value. The value part of the key-value pair pointed + * by the current cursor is not cached and can only be examined once. + * Calling any of the following functions more than once without moving + * the cursor will result in exception: {@link #getValue(byte[])}, + * {@link #getValue(byte[], int)}, {@link #getValueStream}. + * + * @return The input stream for reading the value. + * @throws IOException + */ + public DataInputStream getValueStream() throws IOException { + if (valueChecked == true) { + throw new IllegalStateException( + "Attempt to examine value multiple times."); + } + valueChecked = true; + return valueDataInputStream; + } + + /** + * Check whether it is safe to call getValueLength(). + * + * @return true if value length is known before hand. Values less than + * the chunk size will always have their lengths known before + * hand. Values that are written out as a whole (with advertised + * length up-front) will always have their lengths known in + * read. + */ + public boolean isValueLengthKnown() { + return (vlen >= 0); + } + + /** + * Compare the entry key to another key. Synonymous to compareTo(key, 0, + * key.length). + * + * @param buf + * The key buffer. + * @return comparison result between the entry key with the input key. + */ + public int compareTo(byte[] buf) { + return compareTo(buf, 0, buf.length); + } + + /** + * Compare the entry key to another key. Synonymous to compareTo(new + * ByteArray(buf, offset, length) + * + * @param buf + * The key buffer + * @param offset + * offset into the key buffer. + * @param length + * the length of the key. + * @return comparison result between the entry key with the input key. + */ + public int compareTo(byte[] buf, int offset, int length) { + return compareTo(new ByteArray(buf, offset, length)); + } + + /** + * Compare an entry with a RawComparable object. This is useful when + * Entries are stored in a collection, and we want to compare a user + * supplied key. + */ + @Override + public int compareTo(RawComparable key) { + return reader.compareKeys(blockBuffer, getKeyOffset(), getKeyLength(), key.buffer(), + key.offset(), key.size()); + } + + /** + * Compare whether this and other points to the same key value. + */ + @Override + public boolean equals(Object other) { + if (this == other) return true; + if (!(other instanceof Entry)) return false; + return ((Entry) other).compareTo(blockBuffer, getKeyOffset(), getKeyLength()) == 0; + } + + @Override + public int hashCode() { + return WritableComparator.hashBytes(blockBuffer, getKeyOffset(), getKeyLength()); + } + } + + /** + * Advance cursor by n positions within the block. + * + * @param n + * Number of key-value pairs to skip in block. + * @throws IOException + */ + private void inBlockAdvance(long n) throws IOException { + for (long i = 0; i < n; ++i) { + checkKey(); + if (!valueBufferInputStream.isClosed()) { + valueBufferInputStream.close(); + } + klen = -1; + currentLocation.incRecordIndex(); + } + } + + /** + * Advance cursor in block until we find a key that is greater than or + * equal to the input key. + * + * @param key + * Key to compare. + * @param greater + * advance until we find a key greater than the input key. + * @return true if we find a equal key. + * @throws IOException + */ + private boolean inBlockAdvance(RawComparable key, boolean greater) + throws IOException { + int curBid = currentLocation.getBlockIndex(); + long entryInBlock = reader.getBlockEntryCount(curBid); + if (curBid == endLocation.getBlockIndex()) { + entryInBlock = endLocation.getRecordIndex(); + } + + while (currentLocation.getRecordIndex() < entryInBlock) { + int cmp = compareCursorKeyTo(key); + if (cmp > 0) return false; + if (cmp == 0 && !greater) return true; + if (!valueBufferInputStream.isClosed()) { + valueBufferInputStream.close(); + } + klen = -1; + currentLocation.incRecordIndex(); + } + + throw new RuntimeException("Cannot find matching key in block."); + } + } + + long getBlockEntryCount(int curBid) { + return tfileIndex.getEntry(curBid).entries(); + } + + BlockReader getBlockReader(int blockIndex) throws IOException { + return readerBCF.getDataBlock(blockIndex); + } + } + + /** + * Data structure representing "TFile.meta" meta block. + */ + static final class TFileMeta { + final static String BLOCK_NAME = "TFile.meta"; + final Version version; + private long recordCount; + private final String strComparator; + private final BytesComparator comparator; + + // ctor for writes + public TFileMeta(String comparator) { + // set fileVersion to API version when we create it. + version = DTFile.API_VERSION; + recordCount = 0; + strComparator = (comparator == null) ? "" : comparator; + this.comparator = makeComparator(strComparator); + } + + // ctor for reads + public TFileMeta(DataInput in) throws IOException { + version = new Version(in); + if (!version.compatibleWith(DTFile.API_VERSION)) { + throw new RuntimeException("Incompatible TFile fileVersion."); + } + recordCount = Utils.readVLong(in); + strComparator = Utils.readString(in); + comparator = makeComparator(strComparator); + } + + @SuppressWarnings("unchecked") + static BytesComparator makeComparator(String comparator) { + if (comparator.length() == 0) { + // unsorted keys + return null; + } + if (comparator.equals(COMPARATOR_MEMCMP)) { + // default comparator + return new BytesComparator(new MemcmpRawComparator()); + } else if (comparator.startsWith(COMPARATOR_JCLASS)) { + String compClassName = + comparator.substring(COMPARATOR_JCLASS.length()).trim(); + try { + Class compClass = Class.forName(compClassName); + // use its default ctor to create an instance + return new BytesComparator((RawComparator<Object>) compClass + .newInstance()); + } catch (Exception e) { + throw new IllegalArgumentException( + "Failed to instantiate comparator: " + comparator + "(" + + e.toString() + ")"); + } + } else { + throw new IllegalArgumentException("Unsupported comparator: " + + comparator); + } + } + + public void write(DataOutput out) throws IOException { + DTFile.API_VERSION.write(out); + Utils.writeVLong(out, recordCount); + Utils.writeString(out, strComparator); + } + + public long getRecordCount() { + return recordCount; + } + + public void incRecordCount() { + ++recordCount; + } + + public boolean isSorted() { + return !strComparator.isEmpty(); + } + + public String getComparatorString() { + return strComparator; + } + + public BytesComparator getComparator() { + return comparator; + } + + public Version getVersion() { + return version; + } + } // END: class MetaTFileMeta + + /** + * Data structure representing "TFile.index" meta block. + */ + static class TFileIndex { + final static String BLOCK_NAME = "TFile.index"; + private ByteArray firstKey; + private final ArrayList<TFileIndexEntry> index; + private final ArrayList<Long> recordNumIndex; + private final BytesComparator comparator; + private long sum = 0; + + /** + * For reading from file. + * + * @throws IOException + */ + public TFileIndex(int entryCount, DataInput in, BytesComparator comparator) + throws IOException { + index = new ArrayList<TFileIndexEntry>(entryCount); + recordNumIndex = new ArrayList<Long>(entryCount); + int size = Utils.readVInt(in); // size for the first key entry. + if (size > 0) { + byte[] buffer = new byte[size]; + in.readFully(buffer); + DataInputStream firstKeyInputStream = + new DataInputStream(new ByteArrayInputStream(buffer, 0, size)); + + int firstKeyLength = Utils.readVInt(firstKeyInputStream); + firstKey = new ByteArray(new byte[firstKeyLength]); + firstKeyInputStream.readFully(firstKey.buffer()); + + for (int i = 0; i < entryCount; i++) { + size = Utils.readVInt(in); + if (buffer.length < size) { + buffer = new byte[size]; + } + in.readFully(buffer, 0, size); + TFileIndexEntry idx = + new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream( + buffer, 0, size))); + index.add(idx); + sum += idx.entries(); + recordNumIndex.add(sum); + } + } else { + if (entryCount != 0) { + throw new RuntimeException("Internal error"); + } + } + this.comparator = comparator; + } + + /** + * @param key + * input key. + * @return the ID of the first block that contains key >= input key. Or -1 + * if no such block exists. + */ + public int lowerBound(RawComparable key) { + if (comparator == null) { + throw new RuntimeException("Cannot search in unsorted TFile"); + } + + if (firstKey == null) { + return -1; // not found + } + + int ret = Utils.lowerBound(index, key, comparator); + if (ret == index.size()) { + return -1; + } + return ret; + } + + /** + * @param key + * input key. + * @return the ID of the first block that contains key > input key. Or -1 + * if no such block exists. + */ + public int upperBound(RawComparable key) { + if (comparator == null) { + throw new RuntimeException("Cannot search in unsorted TFile"); + } + + if (firstKey == null) { + return -1; // not found + } + + int ret = Utils.upperBound(index, key, comparator); + if (ret == index.size()) { + return -1; + } + return ret; + } + + /** + * For writing to file. + */ + public TFileIndex(BytesComparator comparator) { + index = new ArrayList<TFileIndexEntry>(); + recordNumIndex = new ArrayList<Long>(); + this.comparator = comparator; + } + + public RawComparable getFirstKey() { + return firstKey; + } + + public Reader.Location getLocationByRecordNum(long recNum) { + int idx = Utils.upperBound(recordNumIndex, recNum); + long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1); + return new Reader.Location(idx, recNum-lastRecNum); + } + + public long getRecordNumByLocation(Reader.Location location) { + int blkIndex = location.getBlockIndex(); + long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1); + return lastRecNum + location.getRecordIndex(); + } + + public void setFirstKey(byte[] key, int offset, int length) { + firstKey = new ByteArray(new byte[length]); + System.arraycopy(key, offset, firstKey.buffer(), 0, length); + } + + public RawComparable getLastKey() { + if (index.size() == 0) { + return null; + } + return new ByteArray(index.get(index.size() - 1).buffer()); + } + + public void addEntry(TFileIndexEntry keyEntry) { + index.add(keyEntry); + sum += keyEntry.entries(); + recordNumIndex.add(sum); + } + + public TFileIndexEntry getEntry(int bid) { + return index.get(bid); + } + + public void write(DataOutput out) throws IOException { + if (firstKey == null) { + Utils.writeVInt(out, 0); + return; + } + + DataOutputBuffer dob = new DataOutputBuffer(); + Utils.writeVInt(dob, firstKey.size()); + dob.write(firstKey.buffer()); + Utils.writeVInt(out, dob.size()); + out.write(dob.getData(), 0, dob.getLength()); + + for (TFileIndexEntry entry : index) { + dob.reset(); + entry.write(dob); + Utils.writeVInt(out, dob.getLength()); + out.write(dob.getData(), 0, dob.getLength()); + } + } + } + + /** + * TFile Data Index entry. We should try to make the memory footprint of each + * index entry as small as possible. + */ + static final class TFileIndexEntry implements RawComparable { + final byte[] key; + // count of <key, value> entries in the block. + final long kvEntries; + + public TFileIndexEntry(DataInput in) throws IOException { + int len = Utils.readVInt(in); + key = new byte[len]; + in.readFully(key, 0, len); + kvEntries = Utils.readVLong(in); + } + + // default entry, without any padding + public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) { + key = new byte[len]; + System.arraycopy(newkey, offset, key, 0, len); + this.kvEntries = entries; + } + + @Override + public byte[] buffer() { + return key; + } + + @Override + public int offset() { + return 0; + } + + @Override + public int size() { + return key.length; + } + + long entries() { + return kvEntries; + } + + public void write(DataOutput out) throws IOException { + Utils.writeVInt(out, key.length); + out.write(key, 0, key.length); + Utils.writeVLong(out, kvEntries); + } + } + + /** + * Dumping the TFile information. + * + * @param args + * A list of TFile paths. + */ + public static void main(String[] args) { + System.out.printf("TFile Dumper (TFile %s, BCFile %s)\n", DTFile.API_VERSION + .toString(), DTBCFile.API_VERSION.toString()); + if (args.length == 0) { + System.out + .println("Usage: java ... com.datatorrent.contrib.hdht.tfile.withcache.TFile tfile-path [tfile-path ...]"); + System.exit(0); + } + Configuration conf = new Configuration(); + + for (String file : args) { + System.out.println("===" + file + "==="); + try { + TFileDumper.dumpInfo(file, System.out, conf); + } catch (IOException e) { + e.printStackTrace(System.err); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java new file mode 100644 index 0000000..25e4f27 --- /dev/null +++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.ByteArrayInputStream; + +/** + * A reusable ByteArrayInputStream extends {@link ByteArrayInputStream} to avoid creating stream object on same byte array. + * <br><br>Call renew() method to reuse this stream from beginning + * + * @since 2.0.0 + */ +public class ReusableByteArrayInputStream extends ByteArrayInputStream +{ + + private final int initialOffset; + + private final int initialLength; + + public ReusableByteArrayInputStream(byte[] buf, int offset, int length) + { + super(buf, offset, length); + this.initialLength = Math.min(offset + length, buf.length); + this.initialOffset = offset; + } + + public ReusableByteArrayInputStream(byte[] buf) + { + super(buf); + this.initialLength = buf.length; + this.initialOffset = 0; + } + + public void renew() + { + pos = initialOffset; + count = initialLength; + mark = 0; + } + + + public int getPos(){ + return pos; + } + + public byte[] getBuf(){ + return buf; + } + +}
