http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java deleted file mode 100644 index e513ccd..0000000 --- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeqFileComparison.java +++ /dev/null @@ -1,802 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.io.file.tfile; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Random; -import java.util.StringTokenizer; - -import junit.framework.TestCase; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry; -import org.apache.hadoop.util.Time; - -public class TestTFileSeqFileComparison extends TestCase { - MyOptions options; - - private FileSystem fs; - private Configuration conf; - private long startTimeEpoch; - private long finishTimeEpoch; - private DateFormat formatter; - byte[][] dictionary; - - @Override - public void setUp() throws IOException { - if (options == null) { - options = new MyOptions(new String[0]); - } - - conf = new Configuration(); - conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize); - conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize); - Path path = new Path(options.rootDir); - fs = path.getFileSystem(conf); - formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - setUpDictionary(); - } - - private void setUpDictionary() { - Random rng = new Random(); - dictionary = new byte[options.dictSize][]; - for (int i = 0; i < options.dictSize; ++i) { - int len = - rng.nextInt(options.maxWordLen - options.minWordLen) - + options.minWordLen; - dictionary[i] = new byte[len]; - rng.nextBytes(dictionary[i]); - } - } - - @Override - public void tearDown() throws IOException { - // do nothing - } - - public void startTime() throws IOException { - startTimeEpoch = Time.now(); - System.out.println(formatTime() + " Started timing."); - } - - public void stopTime() throws IOException { - finishTimeEpoch = Time.now(); - System.out.println(formatTime() + " Stopped timing."); - } - - public long getIntervalMillis() throws IOException { - return finishTimeEpoch - startTimeEpoch; - } - - public void printlnWithTimestamp(String message) throws IOException { - System.out.println(formatTime() + " " + message); - } - - /* - * Format millis into minutes and seconds. - */ - public String formatTime(long milis) { - return formatter.format(milis); - } - - public String formatTime() { - return formatTime(Time.now()); - } - - private interface KVAppendable { - public void append(BytesWritable key, BytesWritable value) - throws IOException; - - public void close() throws IOException; - } - - private interface KVReadable { - public byte[] getKey(); - - public byte[] getValue(); - - public int getKeyLength(); - - public int getValueLength(); - - public boolean next() throws IOException; - - public void close() throws IOException; - } - - static class TFileAppendable implements KVAppendable { - private FSDataOutputStream fsdos; - private TFile.Writer writer; - - public TFileAppendable(FileSystem fs, Path path, String compress, - int minBlkSize, int osBufferSize, Configuration conf) - throws IOException { - this.fsdos = fs.create(path, true, osBufferSize); - this.writer = new TFile.Writer(fsdos, minBlkSize, compress, null, conf); - } - - @Override - public void append(BytesWritable key, BytesWritable value) - throws IOException { - writer.append(key.get(), 0, key.getSize(), value.get(), 0, value - .getSize()); - } - - @Override - public void close() throws IOException { - writer.close(); - fsdos.close(); - } - } - - static class TFileReadable implements KVReadable { - private FSDataInputStream fsdis; - private DTFile.Reader reader; - private DTFile.Reader.Scanner scanner; - private byte[] keyBuffer; - private int keyLength; - private byte[] valueBuffer; - private int valueLength; - - public TFileReadable(FileSystem fs, Path path, int osBufferSize, - Configuration conf) throws IOException { - this.fsdis = fs.open(path, osBufferSize); - this.reader = - new DTFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf); - this.scanner = reader.createScanner(); - keyBuffer = new byte[32]; - valueBuffer = new byte[32]; - } - - private void checkKeyBuffer(int size) { - if (size <= keyBuffer.length) { - return; - } - keyBuffer = - new byte[Math.max(2 * keyBuffer.length, 2 * size - keyBuffer.length)]; - } - - private void checkValueBuffer(int size) { - if (size <= valueBuffer.length) { - return; - } - valueBuffer = - new byte[Math.max(2 * valueBuffer.length, 2 * size - - valueBuffer.length)]; - } - - @Override - public byte[] getKey() { - return keyBuffer; - } - - @Override - public int getKeyLength() { - return keyLength; - } - - @Override - public byte[] getValue() { - return valueBuffer; - } - - @Override - public int getValueLength() { - return valueLength; - } - - @Override - public boolean next() throws IOException { - if (scanner.atEnd()) return false; - Entry entry = scanner.entry(); - keyLength = entry.getKeyLength(); - checkKeyBuffer(keyLength); - entry.getKey(keyBuffer); - valueLength = entry.getValueLength(); - checkValueBuffer(valueLength); - entry.getValue(valueBuffer); - scanner.advance(); - return true; - } - - @Override - public void close() throws IOException { - scanner.close(); - reader.close(); - fsdis.close(); - } - } - - static class SeqFileAppendable implements KVAppendable { - private FSDataOutputStream fsdos; - private SequenceFile.Writer writer; - - public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize, - String compress, int minBlkSize) throws IOException { - Configuration conf = new Configuration(); - conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, - true); - - CompressionCodec codec = null; - if ("lzo".equals(compress)) { - codec = Compression.Algorithm.LZO.getCodec(); - } - else if ("gz".equals(compress)) { - codec = Compression.Algorithm.GZ.getCodec(); - } - else if (!"none".equals(compress)) - throw new IOException("Codec not supported."); - - this.fsdos = fs.create(path, true, osBufferSize); - - if (!"none".equals(compress)) { - writer = - SequenceFile.createWriter(conf, fsdos, BytesWritable.class, - BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec); - } - else { - writer = - SequenceFile.createWriter(conf, fsdos, BytesWritable.class, - BytesWritable.class, SequenceFile.CompressionType.NONE, null); - } - } - - @Override - public void append(BytesWritable key, BytesWritable value) - throws IOException { - writer.append(key, value); - } - - @Override - public void close() throws IOException { - writer.close(); - fsdos.close(); - } - } - - static class SeqFileReadable implements KVReadable { - private SequenceFile.Reader reader; - private BytesWritable key; - private BytesWritable value; - - public SeqFileReadable(FileSystem fs, Path path, int osBufferSize) - throws IOException { - Configuration conf = new Configuration(); - conf.setInt("io.file.buffer.size", osBufferSize); - reader = new SequenceFile.Reader(fs, path, conf); - key = new BytesWritable(); - value = new BytesWritable(); - } - - @Override - public byte[] getKey() { - return key.get(); - } - - @Override - public int getKeyLength() { - return key.getSize(); - } - - @Override - public byte[] getValue() { - return value.get(); - } - - @Override - public int getValueLength() { - return value.getSize(); - } - - @Override - public boolean next() throws IOException { - return reader.next(key, value); - } - - @Override - public void close() throws IOException { - reader.close(); - } - } - - private void reportStats(Path path, long totalBytes) throws IOException { - long duration = getIntervalMillis(); - long fsize = fs.getFileStatus(path).getLen(); - printlnWithTimestamp(String.format( - "Duration: %dms...total size: %.2fMB...raw thrpt: %.2fMB/s", duration, - (double) totalBytes / 1024 / 1024, (double) totalBytes / duration - * 1000 / 1024 / 1024)); - printlnWithTimestamp(String.format( - "Compressed size: %.2fMB...compressed thrpt: %.2fMB/s.", - (double) fsize / 1024 / 1024, (double) fsize / duration * 1000 / 1024 - / 1024)); - } - - private void fillBuffer(Random rng, BytesWritable bw, byte[] tmp, int len) { - int n = 0; - while (n < len) { - byte[] word = dictionary[rng.nextInt(dictionary.length)]; - int l = Math.min(word.length, len - n); - System.arraycopy(word, 0, tmp, n, l); - n += l; - } - bw.set(tmp, 0, len); - } - - private void timeWrite(Path path, KVAppendable appendable, int baseKlen, - int baseVlen, long fileSize) throws IOException { - int maxKlen = baseKlen * 2; - int maxVlen = baseVlen * 2; - BytesWritable key = new BytesWritable(); - BytesWritable value = new BytesWritable(); - byte[] keyBuffer = new byte[maxKlen]; - byte[] valueBuffer = new byte[maxVlen]; - Random rng = new Random(options.seed); - long totalBytes = 0; - printlnWithTimestamp("Start writing: " + path.getName() + "..."); - startTime(); - - for (long i = 0; true; ++i) { - if (i % 1000 == 0) { // test the size for every 1000 rows. - if (fs.getFileStatus(path).getLen() >= fileSize) { - break; - } - } - int klen = rng.nextInt(baseKlen) + baseKlen; - int vlen = rng.nextInt(baseVlen) + baseVlen; - fillBuffer(rng, key, keyBuffer, klen); - fillBuffer(rng, value, valueBuffer, vlen); - key.set(keyBuffer, 0, klen); - value.set(valueBuffer, 0, vlen); - appendable.append(key, value); - totalBytes += klen; - totalBytes += vlen; - } - stopTime(); - appendable.close(); - reportStats(path, totalBytes); - } - - private void timeRead(Path path, KVReadable readable) throws IOException { - printlnWithTimestamp("Start reading: " + path.getName() + "..."); - long totalBytes = 0; - startTime(); - for (; readable.next();) { - totalBytes += readable.getKeyLength(); - totalBytes += readable.getValueLength(); - } - stopTime(); - readable.close(); - reportStats(path, totalBytes); - } - - private void createTFile(String parameters, String compress) - throws IOException { - System.out.println("=== TFile: Creation (" + parameters + ") === "); - Path path = new Path(options.rootDir, "TFile.Performance"); - KVAppendable appendable = - new TFileAppendable(fs, path, compress, options.minBlockSize, - options.osOutputBufferSize, conf); - timeWrite(path, appendable, options.keyLength, options.valueLength, - options.fileSize); - } - - private void readTFile(String parameters, boolean delFile) throws IOException { - System.out.println("=== TFile: Reading (" + parameters + ") === "); - { - Path path = new Path(options.rootDir, "TFile.Performance"); - KVReadable readable = - new TFileReadable(fs, path, options.osInputBufferSize, conf); - timeRead(path, readable); - if (delFile) { - if (fs.exists(path)) { - fs.delete(path, true); - } - } - } - } - - private void createSeqFile(String parameters, String compress) - throws IOException { - System.out.println("=== SeqFile: Creation (" + parameters + ") === "); - Path path = new Path(options.rootDir, "SeqFile.Performance"); - KVAppendable appendable = - new SeqFileAppendable(fs, path, options.osOutputBufferSize, compress, - options.minBlockSize); - timeWrite(path, appendable, options.keyLength, options.valueLength, - options.fileSize); - } - - private void readSeqFile(String parameters, boolean delFile) - throws IOException { - System.out.println("=== SeqFile: Reading (" + parameters + ") === "); - Path path = new Path(options.rootDir, "SeqFile.Performance"); - KVReadable readable = - new SeqFileReadable(fs, path, options.osInputBufferSize); - timeRead(path, readable); - if (delFile) { - if (fs.exists(path)) { - fs.delete(path, true); - } - } - } - - private void compareRun(String compress) throws IOException { - String[] supported = TFile.getSupportedCompressionAlgorithms(); - boolean proceed = false; - for (String c : supported) { - if (c.equals(compress)) { - proceed = true; - break; - } - } - - if (!proceed) { - System.out.println("Skipped for " + compress); - return; - } - - options.compress = compress; - String parameters = parameters2String(options); - createSeqFile(parameters, compress); - readSeqFile(parameters, true); - createTFile(parameters, compress); - readTFile(parameters, true); - createTFile(parameters, compress); - readTFile(parameters, true); - createSeqFile(parameters, compress); - readSeqFile(parameters, true); - } - - public void testRunComparisons() throws IOException { - String[] compresses = new String[] { "none", "lzo", "gz" }; - for (String compress : compresses) { - if (compress.equals("none")) { - conf - .setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeNone); - conf.setInt("tfile.fs.output.buffer.size", - options.fsOutputBufferSizeNone); - } - else if (compress.equals("lzo")) { - conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeLzo); - conf.setInt("tfile.fs.output.buffer.size", - options.fsOutputBufferSizeLzo); - } - else { - conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSizeGz); - conf - .setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSizeGz); - } - compareRun(compress); - } - } - - private static String parameters2String(MyOptions options) { - return String - .format( - "KLEN: %d-%d... VLEN: %d-%d...MinBlkSize: %.2fKB...Target Size: %.2fMB...Compression: ...%s", - options.keyLength, options.keyLength * 2, options.valueLength, - options.valueLength * 2, (double) options.minBlockSize / 1024, - (double) options.fileSize / 1024 / 1024, options.compress); - } - - private static class MyOptions { - String rootDir = - System - .getProperty("test.build.data", "/tmp/tfile-test"); - String compress = "gz"; - String format = "tfile"; - int dictSize = 1000; - int minWordLen = 5; - int maxWordLen = 20; - int keyLength = 50; - int valueLength = 100; - int minBlockSize = 256 * 1024; - int fsOutputBufferSize = 1; - int fsInputBufferSize = 0; - // special variable only for unit testing. - int fsInputBufferSizeNone = 0; - int fsInputBufferSizeGz = 0; - int fsInputBufferSizeLzo = 0; - int fsOutputBufferSizeNone = 1; - int fsOutputBufferSizeGz = 1; - int fsOutputBufferSizeLzo = 1; - - // un-exposed parameters. - int osInputBufferSize = 64 * 1024; - int osOutputBufferSize = 64 * 1024; - - long fileSize = 3 * 1024 * 1024; - long seed; - - static final int OP_CREATE = 1; - static final int OP_READ = 2; - int op = OP_READ; - - boolean proceed = false; - - public MyOptions(String[] args) { - seed = System.nanoTime(); - - try { - Options opts = buildOptions(); - CommandLineParser parser = new GnuParser(); - CommandLine line = parser.parse(opts, args, true); - processOptions(line, opts); - validateOptions(); - } - catch (ParseException e) { - System.out.println(e.getMessage()); - System.out.println("Try \"--help\" option for details."); - setStopProceed(); - } - } - - public boolean proceed() { - return proceed; - } - - private Options buildOptions() { - Option compress = - OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]") - .hasArg().withDescription("compression scheme").create('c'); - - Option ditSize = - OptionBuilder.withLongOpt("dict").withArgName("size").hasArg() - .withDescription("number of dictionary entries").create('d'); - - Option fileSize = - OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB") - .hasArg().withDescription("target size of the file (in MB).") - .create('s'); - - Option format = - OptionBuilder.withLongOpt("format").withArgName("[tfile|seqfile]") - .hasArg().withDescription("choose TFile or SeqFile").create('f'); - - Option fsInputBufferSz = - OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size") - .hasArg().withDescription( - "size of the file system input buffer (in bytes).").create( - 'i'); - - Option fsOutputBufferSize = - OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size") - .hasArg().withDescription( - "size of the file system output buffer (in bytes).").create( - 'o'); - - Option keyLen = - OptionBuilder - .withLongOpt("key-length") - .withArgName("length") - .hasArg() - .withDescription( - "base length of the key (in bytes), actual length varies in [base, 2*base)") - .create('k'); - - Option valueLen = - OptionBuilder - .withLongOpt("value-length") - .withArgName("length") - .hasArg() - .withDescription( - "base length of the value (in bytes), actual length varies in [base, 2*base)") - .create('v'); - - Option wordLen = - OptionBuilder.withLongOpt("word-length").withArgName("min,max") - .hasArg().withDescription( - "range of dictionary word length (in bytes)").create('w'); - - Option blockSz = - OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg() - .withDescription("minimum block size (in KB)").create('b'); - - Option seed = - OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg() - .withDescription("specify the seed").create('S'); - - Option operation = - OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg() - .withDescription( - "action: read-only, create-only, read-after-create").create( - 'x'); - - Option rootDir = - OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg() - .withDescription( - "specify root directory where files will be created.") - .create('r'); - - Option help = - OptionBuilder.withLongOpt("help").hasArg(false).withDescription( - "show this screen").create("h"); - - return new Options().addOption(compress).addOption(ditSize).addOption( - fileSize).addOption(format).addOption(fsInputBufferSz).addOption( - fsOutputBufferSize).addOption(keyLen).addOption(wordLen).addOption( - blockSz).addOption(rootDir).addOption(valueLen).addOption(operation) - .addOption(help); - - } - - private void processOptions(CommandLine line, Options opts) - throws ParseException { - // --help -h and --version -V must be processed first. - if (line.hasOption('h')) { - HelpFormatter formatter = new HelpFormatter(); - System.out.println("TFile and SeqFile benchmark."); - System.out.println(); - formatter.printHelp(100, - "java ... TestTFileSeqFileComparison [options]", - "\nSupported options:", opts, ""); - return; - } - - if (line.hasOption('c')) { - compress = line.getOptionValue('c'); - } - - if (line.hasOption('d')) { - dictSize = Integer.parseInt(line.getOptionValue('d')); - } - - if (line.hasOption('s')) { - fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024; - } - - if (line.hasOption('f')) { - format = line.getOptionValue('f'); - } - - if (line.hasOption('i')) { - fsInputBufferSize = Integer.parseInt(line.getOptionValue('i')); - } - - if (line.hasOption('o')) { - fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o')); - } - - if (line.hasOption('k')) { - keyLength = Integer.parseInt(line.getOptionValue('k')); - } - - if (line.hasOption('v')) { - valueLength = Integer.parseInt(line.getOptionValue('v')); - } - - if (line.hasOption('b')) { - minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024; - } - - if (line.hasOption('r')) { - rootDir = line.getOptionValue('r'); - } - - if (line.hasOption('S')) { - seed = Long.parseLong(line.getOptionValue('S')); - } - - if (line.hasOption('w')) { - String min_max = line.getOptionValue('w'); - StringTokenizer st = new StringTokenizer(min_max, " \t,"); - if (st.countTokens() != 2) { - throw new ParseException("Bad word length specification: " + min_max); - } - minWordLen = Integer.parseInt(st.nextToken()); - maxWordLen = Integer.parseInt(st.nextToken()); - } - - if (line.hasOption('x')) { - String strOp = line.getOptionValue('x'); - if (strOp.equals("r")) { - op = OP_READ; - } - else if (strOp.equals("w")) { - op = OP_CREATE; - } - else if (strOp.equals("rw")) { - op = OP_CREATE | OP_READ; - } - else { - throw new ParseException("Unknown action specifier: " + strOp); - } - } - - proceed = true; - } - - private void validateOptions() throws ParseException { - if (!compress.equals("none") && !compress.equals("lzo") - && !compress.equals("gz")) { - throw new ParseException("Unknown compression scheme: " + compress); - } - - if (!format.equals("tfile") && !format.equals("seqfile")) { - throw new ParseException("Unknown file format: " + format); - } - - if (minWordLen >= maxWordLen) { - throw new ParseException( - "Max word length must be greater than min word length."); - } - return; - } - - private void setStopProceed() { - proceed = false; - } - - public boolean doCreate() { - return (op & OP_CREATE) != 0; - } - - public boolean doRead() { - return (op & OP_READ) != 0; - } - } - - public static void main(String[] args) throws IOException { - TestTFileSeqFileComparison testCase = new TestTFileSeqFileComparison(); - MyOptions options = new MyOptions(args); - if (options.proceed == false) { - return; - } - testCase.options = options; - String parameters = parameters2String(options); - - testCase.setUp(); - if (testCase.options.format.equals("tfile")) { - if (options.doCreate()) { - testCase.createTFile(parameters, options.compress); - } - if (options.doRead()) { - testCase.readTFile(parameters, options.doCreate()); - } - } - else { - if (options.doCreate()) { - testCase.createSeqFile(parameters, options.compress); - } - if (options.doRead()) { - testCase.readSeqFile(parameters, options.doCreate()); - } - } - testCase.tearDown(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java deleted file mode 100644 index aad563d..0000000 --- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSplit.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.io.file.tfile; - -import java.io.IOException; -import java.util.Random; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.file.tfile.DTFile.Reader; -import org.apache.hadoop.io.file.tfile.DTFile.Writer; -import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; -import org.junit.Assert; - -public class TestTFileSplit extends TestCase { - private static String ROOT = - System.getProperty("test.build.data", "/tmp/tfile-test"); - - private final static int BLOCK_SIZE = 64 * 1024; - - private static final String KEY = "key"; - private static final String VALUE = "value"; - - private FileSystem fs; - private Configuration conf; - private Path path; - private Random random = new Random(); - - private String comparator = "memcmp"; - private String outputFile = "TestTFileSplit"; - - void createFile(int count, String compress) throws IOException { - conf = new Configuration(); - path = new Path(ROOT, outputFile + "." + compress); - fs = path.getFileSystem(conf); - FSDataOutputStream out = fs.create(path); - Writer writer = new Writer(out, BLOCK_SIZE, compress, comparator, conf); - - int nx; - for (nx = 0; nx < count; nx++) { - byte[] key = composeSortedKey(KEY, count, nx).getBytes(); - byte[] value = (VALUE + nx).getBytes(); - writer.append(key, value); - } - writer.close(); - out.close(); - } - - void readFile() throws IOException { - long fileLength = fs.getFileStatus(path).getLen(); - int numSplit = 10; - long splitSize = fileLength / numSplit + 1; - - Reader reader = - new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); - long offset = 0; - long rowCount = 0; - BytesWritable key, value; - for (int i = 0; i < numSplit; ++i, offset += splitSize) { - Scanner scanner = reader.createScannerByByteRange(offset, splitSize); - int count = 0; - key = new BytesWritable(); - value = new BytesWritable(); - while (!scanner.atEnd()) { - scanner.entry().get(key, value); - ++count; - scanner.advance(); - } - scanner.close(); - Assert.assertTrue(count > 0); - rowCount += count; - } - Assert.assertEquals(rowCount, reader.getEntryCount()); - reader.close(); - } - - /* Similar to readFile(), tests the scanner created - * by record numbers rather than the offsets. - */ - void readRowSplits(int numSplits) throws IOException { - - Reader reader = - new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); - - long totalRecords = reader.getEntryCount(); - for (int i=0; i<numSplits; i++) { - long startRec = i*totalRecords/numSplits; - long endRec = (i+1)*totalRecords/numSplits; - if (i == numSplits-1) { - endRec = totalRecords; - } - Scanner scanner = reader.createScannerByRecordNum(startRec, endRec); - int count = 0; - BytesWritable key = new BytesWritable(); - BytesWritable value = new BytesWritable(); - long x=startRec; - while (!scanner.atEnd()) { - assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x); - scanner.entry().get(key, value); - ++count; - assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x); - scanner.advance(); - ++x; - } - scanner.close(); - Assert.assertTrue(count == (endRec - startRec)); - } - // make sure specifying range at the end gives zero records. - Scanner scanner = reader.createScannerByRecordNum(totalRecords, -1); - Assert.assertTrue(scanner.atEnd()); - } - - static String composeSortedKey(String prefix, int total, int value) { - return String.format("%s%010d", prefix, value); - } - - void checkRecNums() throws IOException { - long fileLen = fs.getFileStatus(path).getLen(); - Reader reader = new Reader(fs.open(path), fileLen, conf); - long totalRecs = reader.getEntryCount(); - long begin = random.nextLong() % (totalRecs / 2); - if (begin < 0) - begin += (totalRecs / 2); - long end = random.nextLong() % (totalRecs / 2); - if (end < 0) - end += (totalRecs / 2); - end += (totalRecs / 2) + 1; - - assertEquals("RecNum for offset=0 should be 0", 0, reader - .getRecordNumNear(0)); - for (long x : new long[] { fileLen, fileLen + 1, 2 * fileLen }) { - assertEquals("RecNum for offset>=fileLen should be total entries", - totalRecs, reader.getRecordNumNear(x)); - } - - for (long i = 0; i < 100; ++i) { - assertEquals("Locaton to RecNum conversion not symmetric", i, reader - .getRecordNumByLocation(reader.getLocationByRecordNum(i))); - } - - for (long i = 1; i < 100; ++i) { - long x = totalRecs - i; - assertEquals("Locaton to RecNum conversion not symmetric", x, reader - .getRecordNumByLocation(reader.getLocationByRecordNum(x))); - } - - for (long i = begin; i < end; ++i) { - assertEquals("Locaton to RecNum conversion not symmetric", i, reader - .getRecordNumByLocation(reader.getLocationByRecordNum(i))); - } - - for (int i = 0; i < 1000; ++i) { - long x = random.nextLong() % totalRecs; - if (x < 0) x += totalRecs; - assertEquals("Locaton to RecNum conversion not symmetric", x, reader - .getRecordNumByLocation(reader.getLocationByRecordNum(x))); - } - } - - public void testSplit() throws IOException { - System.out.println("testSplit"); - createFile(100000, Compression.Algorithm.NONE.getName()); - checkRecNums(); - readFile(); - readRowSplits(10); - fs.delete(path, true); - createFile(500000, Compression.Algorithm.GZ.getName()); - checkRecNums(); - readFile(); - readRowSplits(83); - fs.delete(path, true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java deleted file mode 100644 index 2e0506c..0000000 --- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileStreams.java +++ /dev/null @@ -1,423 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.io.file.tfile; - -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.util.Random; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.file.tfile.DTFile.Reader; -import org.apache.hadoop.io.file.tfile.DTFile.Writer; -import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; -import org.junit.Assert; - -/** - * - * Streaming interfaces test case class using GZ compression codec, base class - * of none and LZO compression classes. - * - */ - -public class TestTFileStreams extends TestCase { - private static String ROOT = - System.getProperty("test.build.data", "/tmp/tfile-test"); - - private final static int BLOCK_SIZE = 512; - private final static int K = 1024; - private final static int M = K * K; - protected boolean skip = false; - private FileSystem fs; - private Configuration conf; - private Path path; - private FSDataOutputStream out; - Writer writer; - - private String compression = Compression.Algorithm.GZ.getName(); - private String comparator = "memcmp"; - private final String outputFile = getClass().getSimpleName(); - - public void init(String compression, String comparator) { - this.compression = compression; - this.comparator = comparator; - } - - @Override - public void setUp() throws IOException { - conf = new Configuration(); - path = new Path(ROOT, outputFile); - fs = path.getFileSystem(conf); - out = fs.create(path); - writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf); - } - - @Override - public void tearDown() throws IOException { - if (!skip) { - try { - closeOutput(); - } catch (Exception e) { - // no-op - } - fs.delete(path, true); - } - } - - public void testNoEntry() throws IOException { - if (skip) - return; - closeOutput(); - TestDTFileByteArrays.readRecords(fs, path, 0, conf); - } - - public void testOneEntryKnownLength() throws IOException { - if (skip) - return; - writeRecords(1, true, true); - - TestDTFileByteArrays.readRecords(fs, path, 1, conf); - } - - public void testOneEntryUnknownLength() throws IOException { - if (skip) - return; - writeRecords(1, false, false); - - // TODO: will throw exception at getValueLength, it's inconsistent though; - // getKeyLength returns a value correctly, though initial length is -1 - TestDTFileByteArrays.readRecords(fs, path, 1, conf); - } - - // known key length, unknown value length - public void testOneEntryMixedLengths1() throws IOException { - if (skip) - return; - writeRecords(1, true, false); - - TestDTFileByteArrays.readRecords(fs, path, 1, conf); - } - - // unknown key length, known value length - public void testOneEntryMixedLengths2() throws IOException { - if (skip) - return; - writeRecords(1, false, true); - - TestDTFileByteArrays.readRecords(fs, path, 1, conf); - } - - public void testTwoEntriesKnownLength() throws IOException { - if (skip) - return; - writeRecords(2, true, true); - - TestDTFileByteArrays.readRecords(fs, path, 2, conf); - } - - // Negative test - public void testFailureAddKeyWithoutValue() throws IOException { - if (skip) - return; - DataOutputStream dos = writer.prepareAppendKey(-1); - dos.write("key0".getBytes()); - try { - closeOutput(); - fail("Cannot add only a key without a value. "); - } - catch (IllegalStateException e) { - // noop, expecting an exception - } - } - - public void testFailureAddValueWithoutKey() throws IOException { - if (skip) - return; - DataOutputStream outValue = null; - try { - outValue = writer.prepareAppendValue(6); - outValue.write("value0".getBytes()); - fail("Cannot add a value without adding key first. "); - } - catch (Exception e) { - // noop, expecting an exception - } - finally { - if (outValue != null) { - outValue.close(); - } - } - } - - public void testFailureOneEntryKnownLength() throws IOException { - if (skip) - return; - DataOutputStream outKey = writer.prepareAppendKey(2); - try { - outKey.write("key0".getBytes()); - fail("Specified key length mismatched the actual key length."); - } - catch (IOException e) { - // noop, expecting an exception - } - - DataOutputStream outValue = null; - try { - outValue = writer.prepareAppendValue(6); - outValue.write("value0".getBytes()); - } - catch (Exception e) { - // noop, expecting an exception - } - } - - public void testFailureKeyTooLong() throws IOException { - if (skip) - return; - DataOutputStream outKey = writer.prepareAppendKey(2); - try { - outKey.write("key0".getBytes()); - outKey.close(); - Assert.fail("Key is longer than requested."); - } - catch (Exception e) { - // noop, expecting an exception - } - finally { - } - } - - public void testFailureKeyTooShort() throws IOException { - if (skip) - return; - DataOutputStream outKey = writer.prepareAppendKey(4); - outKey.write("key0".getBytes()); - outKey.close(); - DataOutputStream outValue = writer.prepareAppendValue(15); - try { - outValue.write("value0".getBytes()); - outValue.close(); - Assert.fail("Value is shorter than expected."); - } - catch (Exception e) { - // noop, expecting an exception - } - finally { - } - } - - public void testFailureValueTooLong() throws IOException { - if (skip) - return; - DataOutputStream outKey = writer.prepareAppendKey(4); - outKey.write("key0".getBytes()); - outKey.close(); - DataOutputStream outValue = writer.prepareAppendValue(3); - try { - outValue.write("value0".getBytes()); - outValue.close(); - Assert.fail("Value is longer than expected."); - } - catch (Exception e) { - // noop, expecting an exception - } - - try { - outKey.close(); - outKey.close(); - } - catch (Exception e) { - Assert.fail("Second or more close() should have no effect."); - } - } - - public void testFailureValueTooShort() throws IOException { - if (skip) - return; - DataOutputStream outKey = writer.prepareAppendKey(8); - try { - outKey.write("key0".getBytes()); - outKey.close(); - Assert.fail("Key is shorter than expected."); - } - catch (Exception e) { - // noop, expecting an exception - } - finally { - } - } - - public void testFailureCloseKeyStreamManyTimesInWriter() throws IOException { - if (skip) - return; - DataOutputStream outKey = writer.prepareAppendKey(4); - try { - outKey.write("key0".getBytes()); - outKey.close(); - } - catch (Exception e) { - // noop, expecting an exception - } - finally { - try { - outKey.close(); - } - catch (Exception e) { - // no-op - } - } - outKey.close(); - outKey.close(); - Assert.assertTrue("Multiple close should have no effect.", true); - } - - public void testFailureKeyLongerThan64K() throws IOException { - if (skip) - return; - try { - DataOutputStream outKey = writer.prepareAppendKey(64 * K + 1); - Assert.fail("Failed to handle key longer than 64K."); - } - catch (IndexOutOfBoundsException e) { - // noop, expecting exceptions - } - closeOutput(); - } - - public void testFailureKeyLongerThan64K_2() throws IOException { - if (skip) - return; - DataOutputStream outKey = writer.prepareAppendKey(-1); - try { - byte[] buf = new byte[K]; - Random rand = new Random(); - for (int nx = 0; nx < K + 2; nx++) { - rand.nextBytes(buf); - outKey.write(buf); - } - outKey.close(); - Assert.fail("Failed to handle key longer than 64K."); - } - catch (EOFException e) { - // noop, expecting exceptions - } - finally { - try { - closeOutput(); - } - catch (Exception e) { - // no-op - } - } - } - - public void testFailureNegativeOffset() throws IOException { - if (skip) - return; - writeRecords(2, true, true); - - Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); - Scanner scanner = reader.createScanner(); - byte[] buf = new byte[K]; - try { - scanner.entry().getKey(buf, -1); - Assert.fail("Failed to handle key negative offset."); - } - catch (Exception e) { - // noop, expecting exceptions - } - finally { - } - scanner.close(); - reader.close(); - } - - /** - * Verify that the compressed data size is less than raw data size. - * - * @throws IOException - */ - public void testFailureCompressionNotWorking() throws IOException { - if (skip) - return; - long rawDataSize = writeRecords(10000, false, false, false); - if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) { - Assert.assertTrue(out.getPos() < rawDataSize); - } - closeOutput(); - } - - public void testFailureCompressionNotWorking2() throws IOException { - if (skip) - return; - long rawDataSize = writeRecords(10000, true, true, false); - if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) { - Assert.assertTrue(out.getPos() < rawDataSize); - } - closeOutput(); - } - - private long writeRecords(int count, boolean knownKeyLength, - boolean knownValueLength, boolean close) throws IOException { - long rawDataSize = 0; - for (int nx = 0; nx < count; nx++) { - String key = TestDTFileByteArrays.composeSortedKey("key", nx); - DataOutputStream outKey = - writer.prepareAppendKey(knownKeyLength ? key.length() : -1); - outKey.write(key.getBytes()); - outKey.close(); - String value = "value" + nx; - DataOutputStream outValue = - writer.prepareAppendValue(knownValueLength ? value.length() : -1); - outValue.write(value.getBytes()); - outValue.close(); - rawDataSize += - WritableUtils.getVIntSize(key.getBytes().length) - + key.getBytes().length - + WritableUtils.getVIntSize(value.getBytes().length) - + value.getBytes().length; - } - if (close) { - closeOutput(); - } - return rawDataSize; - } - - private long writeRecords(int count, boolean knownKeyLength, - boolean knownValueLength) throws IOException { - return writeRecords(count, knownKeyLength, knownValueLength, true); - } - - private void closeOutput() throws IOException { - if (writer != null) { - writer.close(); - writer = null; - } - if (out != null) { - out.close(); - out = null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java b/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java deleted file mode 100644 index a58f649..0000000 --- a/contrib/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.io.file.tfile; - -import java.io.IOException; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.file.tfile.DTFile.Reader; -import org.apache.hadoop.io.file.tfile.DTFile.Writer; -import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; -import org.junit.Assert; - -public class TestTFileUnsortedByteArrays extends TestCase { - private static String ROOT = - System.getProperty("test.build.data", "/tmp/tfile-test"); - - - private final static int BLOCK_SIZE = 512; - private final static int BUF_SIZE = 64; - - private FileSystem fs; - private Configuration conf; - private Path path; - private FSDataOutputStream out; - private Writer writer; - - private String compression = Compression.Algorithm.GZ.getName(); - private String outputFile = "TFileTestUnsorted"; - /* - * pre-sampled numbers of records in one block, based on the given the - * generated key and value strings - */ - private int records1stBlock = 4314; - private int records2ndBlock = 4108; - - public void init(String compression, String outputFile, - int numRecords1stBlock, int numRecords2ndBlock) { - this.compression = compression; - this.outputFile = outputFile; - this.records1stBlock = numRecords1stBlock; - this.records2ndBlock = numRecords2ndBlock; - } - - @Override - public void setUp() throws IOException { - conf = new Configuration(); - path = new Path(ROOT, outputFile); - fs = path.getFileSystem(conf); - out = fs.create(path); - writer = new Writer(out, BLOCK_SIZE, compression, null, conf); - writer.append("keyZ".getBytes(), "valueZ".getBytes()); - writer.append("keyM".getBytes(), "valueM".getBytes()); - writer.append("keyN".getBytes(), "valueN".getBytes()); - writer.append("keyA".getBytes(), "valueA".getBytes()); - closeOutput(); - } - - @Override - public void tearDown() throws IOException { - fs.delete(path, true); - } - - // we still can scan records in an unsorted TFile - public void testFailureScannerWithKeys() throws IOException { - Reader reader = - new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); - Assert.assertFalse(reader.isSorted()); - Assert.assertEquals((int) reader.getEntryCount(), 4); - - try { - Scanner scanner = - reader.createScannerByKey("aaa".getBytes(), "zzz".getBytes()); - Assert - .fail("Failed to catch creating scanner with keys on unsorted file."); - } - catch (RuntimeException e) { - } - finally { - reader.close(); - } - } - - // we still can scan records in an unsorted TFile - public void testScan() throws IOException { - Reader reader = - new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); - Assert.assertFalse(reader.isSorted()); - Assert.assertEquals((int) reader.getEntryCount(), 4); - - Scanner scanner = reader.createScanner(); - - try { - - // read key and value - byte[] kbuf = new byte[BUF_SIZE]; - int klen = scanner.entry().getKeyLength(); - scanner.entry().getKey(kbuf); - Assert.assertEquals(new String(kbuf, 0, klen), "keyZ"); - - byte[] vbuf = new byte[BUF_SIZE]; - int vlen = scanner.entry().getValueLength(); - scanner.entry().getValue(vbuf); - Assert.assertEquals(new String(vbuf, 0, vlen), "valueZ"); - - scanner.advance(); - - // now try get value first - vbuf = new byte[BUF_SIZE]; - vlen = scanner.entry().getValueLength(); - scanner.entry().getValue(vbuf); - Assert.assertEquals(new String(vbuf, 0, vlen), "valueM"); - - kbuf = new byte[BUF_SIZE]; - klen = scanner.entry().getKeyLength(); - scanner.entry().getKey(kbuf); - Assert.assertEquals(new String(kbuf, 0, klen), "keyM"); - } - finally { - scanner.close(); - reader.close(); - } - } - - // we still can scan records in an unsorted TFile - public void testScanRange() throws IOException { - Reader reader = - new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); - Assert.assertFalse(reader.isSorted()); - Assert.assertEquals((int) reader.getEntryCount(), 4); - - Scanner scanner = reader.createScanner(); - - try { - - // read key and value - byte[] kbuf = new byte[BUF_SIZE]; - int klen = scanner.entry().getKeyLength(); - scanner.entry().getKey(kbuf); - Assert.assertEquals(new String(kbuf, 0, klen), "keyZ"); - - byte[] vbuf = new byte[BUF_SIZE]; - int vlen = scanner.entry().getValueLength(); - scanner.entry().getValue(vbuf); - Assert.assertEquals(new String(vbuf, 0, vlen), "valueZ"); - - scanner.advance(); - - // now try get value first - vbuf = new byte[BUF_SIZE]; - vlen = scanner.entry().getValueLength(); - scanner.entry().getValue(vbuf); - Assert.assertEquals(new String(vbuf, 0, vlen), "valueM"); - - kbuf = new byte[BUF_SIZE]; - klen = scanner.entry().getKeyLength(); - scanner.entry().getKey(kbuf); - Assert.assertEquals(new String(kbuf, 0, klen), "keyM"); - } - finally { - scanner.close(); - reader.close(); - } - } - - public void testFailureSeek() throws IOException { - Reader reader = - new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); - Scanner scanner = reader.createScanner(); - - try { - // can't find ceil - try { - scanner.lowerBound("keyN".getBytes()); - Assert.fail("Cannot search in a unsorted TFile!"); - } - catch (Exception e) { - // noop, expecting excetions - } - finally { - } - - // can't find higher - try { - scanner.upperBound("keyA".getBytes()); - Assert.fail("Cannot search higher in a unsorted TFile!"); - } - catch (Exception e) { - // noop, expecting excetions - } - finally { - } - - // can't seek - try { - scanner.seekTo("keyM".getBytes()); - Assert.fail("Cannot search a unsorted TFile!"); - } - catch (Exception e) { - // noop, expecting excetions - } - finally { - } - } - finally { - scanner.close(); - reader.close(); - } - } - - private void closeOutput() throws IOException { - if (writer != null) { - writer.close(); - writer = null; - out.close(); - out = null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java new file mode 100644 index 0000000..2f47a76 --- /dev/null +++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/CacheManager.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.lang.management.ManagementFactory; +import java.util.Collection; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.io.file.tfile.DTBCFile.Reader.BlockReader; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.Weigher; + +/** + * A single global managed cache + * User can limit the cache size by num of entries, memory size (bytes) or percentage of total heap size + * <br> + * <br> + * Please refer to <a href="https://code.google.com/p/guava-libraries/wiki/CachesExplained">Guava Cache</a> fir details + * <br> + * <br> + * It keeps {@link String} as key and {@link BlockReader} as value + * + * @since 2.0.0 + */ +public class CacheManager +{ + public static final int STRING_OVERHEAD = 64; + + public static final int BLOCK_READER_OVERHEAD = 368; + + public static final float DEFAULT_HEAP_MEMORY_PERCENTAGE = 0.25f; + + private static Cache<String, BlockReader> singleCache; + + private static boolean enableStats = false; + + public static final Cache<String, BlockReader> buildCache(CacheBuilder builder) { + if (singleCache != null) { + singleCache.cleanUp(); + } + if (enableStats) { + //todo: when we upgrade to a newer guava version we can use this + // builder.recordStats(); + } + singleCache = builder.build(); + return singleCache; + } + + /** + * (Re)Create the cache by limiting the maximum entries + * @param concurrencyLevel + * @param initialCapacity + * @param maximunSize + * @return The cache. + */ + public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, int maximunSize){ + CacheBuilder builder = CacheBuilder.newBuilder(). + concurrencyLevel(concurrencyLevel). + initialCapacity(initialCapacity). + maximumSize(maximunSize); + + return buildCache(builder); + } + + + /** + * (Re)Create the cache by limiting the memory(in bytes) + * @param concurrencyLevel + * @param initialCapacity + * @param maximumMemory + * @return The cache. + */ + public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, long maximumMemory){ + + CacheBuilder builder = CacheBuilder.newBuilder(). + concurrencyLevel(concurrencyLevel). + initialCapacity(initialCapacity). + maximumWeight(maximumMemory).weigher(new KVWeigher()); + + return buildCache(builder); + } + + /** + * (Re)Create the cache by limiting percentage of the total heap memory + * @param concurrencyLevel + * @param initialCapacity + * @param heapMemPercentage + * @return The cache. + */ + public static final Cache<String, BlockReader> createCache(int concurrencyLevel,int initialCapacity, float heapMemPercentage){ + CacheBuilder builder = CacheBuilder.newBuilder(). + concurrencyLevel(concurrencyLevel). + initialCapacity(initialCapacity). + maximumWeight((long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * heapMemPercentage)).weigher(new KVWeigher()); + return buildCache(builder); + } + + public static final void createDefaultCache(){ + + long availableMemory = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() * DEFAULT_HEAP_MEMORY_PERCENTAGE); + CacheBuilder<String, BlockReader> builder = CacheBuilder.newBuilder().maximumWeight(availableMemory).weigher(new KVWeigher()); + + singleCache = buildCache(builder); + } + + public static final void put(String key, BlockReader blk){ + if (singleCache == null) { + createDefaultCache(); + } + singleCache.put(key, blk); + } + + public static final BlockReader get(String key){ + if (singleCache == null) { + return null; + } + return singleCache.getIfPresent(key); + } + + public static final void invalidateKeys(Collection<String> keys) + { + if (singleCache != null) + singleCache.invalidateAll(keys); + } + + public static final long getCacheSize() { + if (singleCache != null) + return singleCache.size(); + return 0; + } + + public static final class KVWeigher implements Weigher<String, BlockReader> { + + @Override + public int weigh(String key, BlockReader value) + { + return (STRING_OVERHEAD + BLOCK_READER_OVERHEAD) + + key.getBytes().length + + value.getBlockDataInputStream().getBuf().length; + } + + } + + @VisibleForTesting + protected static Cache<String, BlockReader> getCache() { + return singleCache; + } + + public static final void setEnableStats(boolean enable) { + enableStats = enable; + } + + public static void main(String[] args) + { + + //code to eitsmate the overhead of the instance of the key value objects + // it depends on hbase file +// System.out.println(ClassSize.estimateBase(BlockReader.class, true) + +// ClassSize.estimateBase(Algorithm.class, true) + +// ClassSize.estimateBase(RBlockState.class, true) + +// ClassSize.estimateBase(ReusableByteArrayInputStream.class, true) + +// ClassSize.estimateBase(BlockRegion.class, true)); +// +// System.out.println( +// ClassSize.estimateBase(String.class, true)); + } + +}
