http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java new file mode 100644 index 0000000..003d9eb --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/DTFileTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import org.junit.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +public class DTFileTest +{ + private Configuration conf; + private Path path; + private FileSystem fs; + private NanoTimer timer; + private Random rng; + private RandomDistribution.DiscreteRNG keyLenGen; + private KVGenerator kvGen; + + + static class TestConf { + public int minWordLen = 5; + public int maxWordLen = 20; + public int dictSize = 1000; + int minKeyLen = 10; + int maxKeyLen = 50; + int minValLength = 100; + int maxValLength = 200; + int minBlockSize = 64 * 1024; + int fsOutputBufferSize = 1; + int fsInputBufferSize = 256 * 1024; + long fileSize = 3 * 1024 * 1024; + long seekCount = 1000; + String compress = "gz"; + + } + + TestConf tconf = new TestConf(); + + public void setUp() throws IOException + { + conf = new Configuration(); + + conf.setInt("tfile.fs.input.buffer.size", tconf.fsInputBufferSize); + conf.setInt("tfile.fs.output.buffer.size", tconf.fsOutputBufferSize); + path = new Path("tmp/dtfile"); + fs = path.getFileSystem(conf); + timer = new NanoTimer(false); + rng = new Random(); + keyLenGen = + new RandomDistribution.Zipf(new Random(rng.nextLong()), + tconf.minKeyLen, tconf.maxKeyLen, 1.2); + RandomDistribution.DiscreteRNG valLenGen = + new RandomDistribution.Flat(new Random(rng.nextLong()), + tconf.minValLength, tconf.maxValLength); + RandomDistribution.DiscreteRNG wordLenGen = + new RandomDistribution.Flat(new Random(rng.nextLong()), + tconf.minWordLen, tconf.maxWordLen); + kvGen = + new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen, + tconf.dictSize); + } + + + private static FSDataOutputStream createFSOutput(Path name, FileSystem fs) + throws IOException { + if (fs.exists(name)) { + fs.delete(name, true); + } + FSDataOutputStream fout = fs.create(name); + return fout; + } + + int tuples = 0; + + private void writeTFile() throws IOException + { + + FSDataOutputStream fout = createFSOutput(path, fs); + byte[] key = new byte[16]; + ByteBuffer bb = ByteBuffer.wrap(key); + try { + DTFile.Writer writer = + new DTFile.Writer(fout, tconf.minBlockSize, tconf.compress, "memcmp", + conf); + try { + BytesWritable tmpKey = new BytesWritable(); + BytesWritable val = new BytesWritable(); + for (long i = 0; true; ++i) { + if (i % 1000 == 0) { // test the size for every 1000 rows. + if (fs.getFileStatus(path).getLen() >= tconf.fileSize) { + break; + } + } + bb.clear(); + bb.putLong(i); + kvGen.next(tmpKey, val, false); + writer.append(key, 0, key.length, val.get(), 0, val + .getSize()); + tuples++; + } + } + finally { + writer.close(); + } + } + finally { + fout.close(); + } + + long fsize = fs.getFileStatus(path).getLen(); + + System.out.println("Total tuple wrote " + tuples + " File size " + fsize / (1024.0 * 1024)); + } + + + + @Test + public void seekDTFile() throws IOException + { + Random random = new Random(); + int ikey = random.nextInt(tuples); + byte[] key = new byte[16]; + ByteBuffer bb = ByteBuffer.wrap(key); + bb.putLong(ikey); + + FSDataInputStream fsdis = fs.open(path); + + if (CacheManager.getCache() != null) { + CacheManager.getCache().invalidateAll(); + } + CacheManager.setEnableStats(true); + Assert.assertEquals("Cache Contains no block", CacheManager.getCacheSize(), 0); + + DTFile.Reader reader = new DTFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf); + DTFile.Reader.Scanner scanner = reader.createScanner(); + + /* Read first key in the file */ + long numBlocks = CacheManager.getCacheSize(); + scanner.lowerBound(key); + Assert.assertEquals("Cache contains some blocks ", numBlocks + 1, CacheManager.getCacheSize()); + + /* Next key does not add a new block in cache, it reads directly from cache */ + // close scanner, so that it does not use its own cache. + scanner.close(); + ikey++; + bb.clear(); + bb.putLong(ikey); + + numBlocks = CacheManager.getCacheSize(); + long hit = CacheManager.getCache().stats().hitCount(); + scanner.lowerBound(key); + Assert.assertEquals("Cache contains some blocks ", CacheManager.getCacheSize(), numBlocks); + Assert.assertEquals("Cache hit ", CacheManager.getCache().stats().hitCount(), hit+1); + + /* test cache miss */ + scanner.close(); + hit = CacheManager.getCache().stats().hitCount(); + long oldmiss = CacheManager.getCache().stats().missCount(); + ikey = tuples-1; + bb.clear(); + bb.putLong(ikey); + numBlocks = CacheManager.getCacheSize(); + scanner.lowerBound(key); + Assert.assertEquals("Cache contains one more blocks ", CacheManager.getCacheSize(), numBlocks + 1); + Assert.assertEquals("No cache hit ", CacheManager.getCache().stats().hitCount(), hit); + Assert.assertEquals("Cache miss", CacheManager.getCache().stats().missCount(), oldmiss + 1); + + Assert.assertEquals("Reverse lookup cache and block cache has same number of entries", + reader.readerBCF.getCacheKeys().size(), CacheManager.getCacheSize()); + reader.close(); + Assert.assertEquals("Cache blocks are deleted on reader close ", CacheManager.getCacheSize(), 0); + Assert.assertEquals("Size of reverse lookup cache is zero ", 0, reader.readerBCF.getCacheKeys().size()); + } + + @Test + public void checkInvalidKeys() + { + /* invalidating non existing key do not throw exception */ + List<String> lst = new LinkedList<String>(); + lst.add("One"); + lst.add("Two"); + CacheManager.getCache().invalidateAll(lst); + } + + @Before + public void createDTfile() throws IOException + { + setUp(); + writeTFile(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java new file mode 100644 index 0000000..49fedeb --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFile.java @@ -0,0 +1,432 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.file.tfile.DTFile.Reader; +import org.apache.hadoop.io.file.tfile.DTFile.Writer; +import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; + +/** + * test tfile features. + * + */ +public class TestDTFile extends TestCase { + private static String ROOT = + System.getProperty("test.build.data", "/tmp/tfile-test"); + private FileSystem fs; + private Configuration conf; + private static final int minBlockSize = 512; + private static final int largeVal = 3 * 1024 * 1024; + private static final String localFormatter = "%010d"; + + @Override + public void setUp() throws IOException { + conf = new Configuration(); + fs = FileSystem.get(conf); + } + + @Override + public void tearDown() throws IOException { + // do nothing + } + + // read a key from the scanner + public byte[] readKey(Scanner scanner) throws IOException { + int keylen = scanner.entry().getKeyLength(); + byte[] read = new byte[keylen]; + scanner.entry().getKey(read); + return read; + } + + // read a value from the scanner + public byte[] readValue(Scanner scanner) throws IOException { + int valueLen = scanner.entry().getValueLength(); + byte[] read = new byte[valueLen]; + scanner.entry().getValue(read); + return read; + } + + // read a long value from the scanner + public byte[] readLongValue(Scanner scanner, int len) throws IOException { + DataInputStream din = scanner.entry().getValueStream(); + byte[] b = new byte[len]; + din.readFully(b); + din.close(); + return b; + } + + // write some records into the tfile + // write them twice + private int writeSomeRecords(Writer writer, int start, int n) + throws IOException { + String value = "value"; + for (int i = start; i < (start + n); i++) { + String key = String.format(localFormatter, i); + writer.append(key.getBytes(), (value + key).getBytes()); + writer.append(key.getBytes(), (value + key).getBytes()); + } + return (start + n); + } + + // read the records and check + private int readAndCheckbytes(Scanner scanner, int start, int n) + throws IOException { + String value = "value"; + for (int i = start; i < (start + n); i++) { + byte[] key = readKey(scanner); + byte[] val = readValue(scanner); + String keyStr = String.format(localFormatter, i); + String valStr = value + keyStr; + assertTrue("btyes for keys do not match " + keyStr + " " + + new String(key), Arrays.equals(keyStr.getBytes(), key)); + assertTrue("bytes for vals do not match " + valStr + " " + + new String(val), Arrays.equals( + valStr.getBytes(), val)); + assertTrue(scanner.advance()); + key = readKey(scanner); + val = readValue(scanner); + assertTrue("btyes for keys do not match", Arrays.equals( + keyStr.getBytes(), key)); + assertTrue("bytes for vals do not match", Arrays.equals( + valStr.getBytes(), val)); + assertTrue(scanner.advance()); + } + return (start + n); + } + + // write some large records + // write them twice + private int writeLargeRecords(Writer writer, int start, int n) + throws IOException { + byte[] value = new byte[largeVal]; + for (int i = start; i < (start + n); i++) { + String key = String.format(localFormatter, i); + writer.append(key.getBytes(), value); + writer.append(key.getBytes(), value); + } + return (start + n); + } + + // read large records + // read them twice since its duplicated + private int readLargeRecords(Scanner scanner, int start, int n) + throws IOException { + for (int i = start; i < (start + n); i++) { + byte[] key = readKey(scanner); + String keyStr = String.format(localFormatter, i); + assertTrue("btyes for keys do not match", Arrays.equals( + keyStr.getBytes(), key)); + scanner.advance(); + key = readKey(scanner); + assertTrue("btyes for keys do not match", Arrays.equals( + keyStr.getBytes(), key)); + scanner.advance(); + } + return (start + n); + } + + // write empty keys and values + private void writeEmptyRecords(Writer writer, int n) throws IOException { + byte[] key = new byte[0]; + byte[] value = new byte[0]; + for (int i = 0; i < n; i++) { + writer.append(key, value); + } + } + + // read empty keys and values + private void readEmptyRecords(Scanner scanner, int n) throws IOException { + byte[] key = new byte[0]; + byte[] value = new byte[0]; + byte[] readKey = null; + byte[] readValue = null; + for (int i = 0; i < n; i++) { + readKey = readKey(scanner); + readValue = readValue(scanner); + assertTrue("failed to match keys", Arrays.equals(readKey, key)); + assertTrue("failed to match values", Arrays.equals(readValue, value)); + assertTrue("failed to advance cursor", scanner.advance()); + } + } + + private int writePrepWithKnownLength(Writer writer, int start, int n) + throws IOException { + // get the length of the key + String key = String.format(localFormatter, start); + int keyLen = key.getBytes().length; + String value = "value" + key; + int valueLen = value.getBytes().length; + for (int i = start; i < (start + n); i++) { + DataOutputStream out = writer.prepareAppendKey(keyLen); + String localKey = String.format(localFormatter, i); + out.write(localKey.getBytes()); + out.close(); + out = writer.prepareAppendValue(valueLen); + String localValue = "value" + localKey; + out.write(localValue.getBytes()); + out.close(); + } + return (start + n); + } + + private int readPrepWithKnownLength(Scanner scanner, int start, int n) + throws IOException { + for (int i = start; i < (start + n); i++) { + String key = String.format(localFormatter, i); + byte[] read = readKey(scanner); + assertTrue("keys not equal", Arrays.equals(key.getBytes(), read)); + String value = "value" + key; + read = readValue(scanner); + assertTrue("values not equal", Arrays.equals(value.getBytes(), read)); + scanner.advance(); + } + return (start + n); + } + + private int writePrepWithUnkownLength(Writer writer, int start, int n) + throws IOException { + for (int i = start; i < (start + n); i++) { + DataOutputStream out = writer.prepareAppendKey(-1); + String localKey = String.format(localFormatter, i); + out.write(localKey.getBytes()); + out.close(); + String value = "value" + localKey; + out = writer.prepareAppendValue(-1); + out.write(value.getBytes()); + out.close(); + } + return (start + n); + } + + private int readPrepWithUnknownLength(Scanner scanner, int start, int n) + throws IOException { + for (int i = start; i < start; i++) { + String key = String.format(localFormatter, i); + byte[] read = readKey(scanner); + assertTrue("keys not equal", Arrays.equals(key.getBytes(), read)); + try { + read = readValue(scanner); + assertTrue(false); + } + catch (IOException ie) { + // should have thrown exception + } + String value = "value" + key; + read = readLongValue(scanner, value.getBytes().length); + assertTrue("values nto equal", Arrays.equals(read, value.getBytes())); + scanner.advance(); + } + return (start + n); + } + + private byte[] getSomeKey(int rowId) { + return String.format(localFormatter, rowId).getBytes(); + } + + private void writeRecords(Writer writer) throws IOException { + writeEmptyRecords(writer, 10); + int ret = writeSomeRecords(writer, 0, 100); + ret = writeLargeRecords(writer, ret, 1); + ret = writePrepWithKnownLength(writer, ret, 40); + ret = writePrepWithUnkownLength(writer, ret, 50); + writer.close(); + } + + private void readAllRecords(Scanner scanner) throws IOException { + readEmptyRecords(scanner, 10); + int ret = readAndCheckbytes(scanner, 0, 100); + ret = readLargeRecords(scanner, ret, 1); + ret = readPrepWithKnownLength(scanner, ret, 40); + ret = readPrepWithUnknownLength(scanner, ret, 50); + } + + private FSDataOutputStream createFSOutput(Path name) throws IOException { + if (fs.exists(name)) fs.delete(name, true); + FSDataOutputStream fout = fs.create(name); + return fout; + } + + /** + * test none codecs + */ + void basicWithSomeCodec(String codec) throws IOException { + Path ncTFile = new Path(ROOT, "basic.tfile"); + FSDataOutputStream fout = createFSOutput(ncTFile); + Writer writer = new Writer(fout, minBlockSize, codec, "memcmp", conf); + writeRecords(writer); + fout.close(); + FSDataInputStream fin = fs.open(ncTFile); + Reader reader = + new Reader(fs.open(ncTFile), fs.getFileStatus(ncTFile).getLen(), conf); + + Scanner scanner = reader.createScanner(); + readAllRecords(scanner); + scanner.seekTo(getSomeKey(50)); + assertTrue("location lookup failed", scanner.seekTo(getSomeKey(50))); + // read the key and see if it matches + byte[] readKey = readKey(scanner); + assertTrue("seeked key does not match", Arrays.equals(getSomeKey(50), + readKey)); + + scanner.seekTo(new byte[0]); + byte[] val1 = readValue(scanner); + scanner.seekTo(new byte[0]); + byte[] val2 = readValue(scanner); + assertTrue(Arrays.equals(val1, val2)); + + // check for lowerBound + scanner.lowerBound(getSomeKey(50)); + assertTrue("locaton lookup failed", scanner.currentLocation + .compareTo(reader.end()) < 0); + readKey = readKey(scanner); + assertTrue("seeked key does not match", Arrays.equals(readKey, + getSomeKey(50))); + + // check for upper bound + scanner.upperBound(getSomeKey(50)); + assertTrue("location lookup failed", scanner.currentLocation + .compareTo(reader.end()) < 0); + readKey = readKey(scanner); + assertTrue("seeked key does not match", Arrays.equals(readKey, + getSomeKey(51))); + + scanner.close(); + // test for a range of scanner + scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60)); + readAndCheckbytes(scanner, 10, 50); + assertFalse(scanner.advance()); + scanner.close(); + reader.close(); + fin.close(); + fs.delete(ncTFile, true); + } + + // unsorted with some codec + void unsortedWithSomeCodec(String codec) throws IOException { + Path uTfile = new Path(ROOT, "unsorted.tfile"); + FSDataOutputStream fout = createFSOutput(uTfile); + Writer writer = new Writer(fout, minBlockSize, codec, null, conf); + writeRecords(writer); + writer.close(); + fout.close(); + FSDataInputStream fin = fs.open(uTfile); + Reader reader = + new Reader(fs.open(uTfile), fs.getFileStatus(uTfile).getLen(), conf); + + Scanner scanner = reader.createScanner(); + readAllRecords(scanner); + scanner.close(); + reader.close(); + fin.close(); + fs.delete(uTfile, true); + } + + public void testTFileFeatures() throws IOException { + basicWithSomeCodec("none"); + basicWithSomeCodec("gz"); + } + + // test unsorted t files. + public void testUnsortedTFileFeatures() throws IOException { + unsortedWithSomeCodec("none"); + unsortedWithSomeCodec("gz"); + } + + private void writeNumMetablocks(Writer writer, String compression, int n) + throws IOException { + for (int i = 0; i < n; i++) { + DataOutputStream dout = + writer.prepareMetaBlock("TfileMeta" + i, compression); + byte[] b = ("something to test" + i).getBytes(); + dout.write(b); + dout.close(); + } + } + + private void someTestingWithMetaBlock(Writer writer, String compression) + throws IOException { + DataOutputStream dout = null; + writeNumMetablocks(writer, compression, 10); + try { + dout = writer.prepareMetaBlock("TfileMeta1", compression); + assertTrue(false); + } + catch (MetaBlockAlreadyExists me) { + // avoid this exception + } + dout = writer.prepareMetaBlock("TFileMeta100", compression); + dout.close(); + } + + private void readNumMetablocks(Reader reader, int n) throws IOException { + int len = ("something to test" + 0).getBytes().length; + for (int i = 0; i < n; i++) { + DataInputStream din = reader.getMetaBlock("TfileMeta" + i); + byte b[] = new byte[len]; + din.readFully(b); + assertTrue("faield to match metadata", Arrays.equals( + ("something to test" + i).getBytes(), b)); + din.close(); + } + } + + private void someReadingWithMetaBlock(Reader reader) throws IOException { + DataInputStream din = null; + readNumMetablocks(reader, 10); + try { + din = reader.getMetaBlock("NO ONE"); + assertTrue(false); + } + catch (MetaBlockDoesNotExist me) { + // should catch + } + din = reader.getMetaBlock("TFileMeta100"); + int read = din.read(); + assertTrue("check for status", (read == -1)); + din.close(); + } + + // test meta blocks for tfiles + public void _testMetaBlocks() throws IOException { + Path mFile = new Path(ROOT, "meta.tfile"); + FSDataOutputStream fout = createFSOutput(mFile); + Writer writer = new Writer(fout, minBlockSize, "none", null, conf); + someTestingWithMetaBlock(writer, "none"); + writer.close(); + fout.close(); + FSDataInputStream fin = fs.open(mFile); + Reader reader = new Reader(fin, fs.getFileStatus(mFile).getLen(), conf); + someReadingWithMetaBlock(reader); + fs.delete(mFile, true); + reader.close(); + fin.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java new file mode 100644 index 0000000..a1fa5c8 --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestDTFileByteArrays.java @@ -0,0 +1,773 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; +import org.apache.hadoop.io.file.tfile.DTFile.Reader; +import org.apache.hadoop.io.file.tfile.DTFile.Writer; +import org.apache.hadoop.io.file.tfile.DTFile.Reader.Location; +import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * + * Byte arrays test case class using GZ compression codec, base class of none + * and LZO compression classes. + * + */ +public class TestDTFileByteArrays { + private static String ROOT = + System.getProperty("test.build.data", "/tmp/tfile-test"); + private final static int BLOCK_SIZE = 512; + private final static int BUF_SIZE = 64; + private final static int K = 1024; + protected boolean skip = false; + + private static final String KEY = "key"; + private static final String VALUE = "value"; + + private FileSystem fs; + private Configuration conf = new Configuration(); + private Path path; + private FSDataOutputStream out; + private Writer writer; + + private String compression = Compression.Algorithm.GZ.getName(); + private String comparator = "memcmp"; + private final String outputFile = getClass().getSimpleName(); + + /* + * pre-sampled numbers of records in one block, based on the given the + * generated key and value strings. This is slightly different based on + * whether or not the native libs are present. + */ + private boolean usingNative = ZlibFactory.isNativeZlibLoaded(conf); + private int records1stBlock = usingNative ? 5674 : 4480; + private int records2ndBlock = usingNative ? 5574 : 4263; + + public void init(String compression, String comparator, + int numRecords1stBlock, int numRecords2ndBlock) { + init(compression, comparator); + this.records1stBlock = numRecords1stBlock; + this.records2ndBlock = numRecords2ndBlock; + } + + public void init(String compression, String comparator) { + this.compression = compression; + this.comparator = comparator; + } + + @Before + public void setUp() throws IOException { + path = new Path(ROOT, outputFile); + fs = path.getFileSystem(conf); + out = fs.create(path); + writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf); + } + + @After + public void tearDown() throws IOException { + if (!skip) + fs.delete(path, true); + } + + @Test + public void testNoDataEntry() throws IOException { + if (skip) + return; + closeOutput(); + + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Assert.assertTrue(reader.isSorted()); + Scanner scanner = reader.createScanner(); + Assert.assertTrue(scanner.atEnd()); + scanner.close(); + reader.close(); + } + + @Test + public void testOneDataEntry() throws IOException { + if (skip) + return; + writeRecords(1); + readRecords(1); + + checkBlockIndex(0, 0); + readValueBeforeKey(0); + readKeyWithoutValue(0); + readValueWithoutKey(0); + readKeyManyTimes(0); + } + + @Test + public void testTwoDataEntries() throws IOException { + if (skip) + return; + writeRecords(2); + readRecords(2); + } + + /** + * Fill up exactly one block. + * + * @throws IOException + */ + @Test + public void testOneBlock() throws IOException { + if (skip) + return; + // just under one block + writeRecords(records1stBlock); + readRecords(records1stBlock); + // last key should be in the first block (block 0) + checkBlockIndex(records1stBlock - 1, 0); + } + + /** + * One block plus one record. + * + * @throws IOException + */ + @Test + public void testOneBlockPlusOneEntry() throws IOException { + if (skip) + return; + writeRecords(records1stBlock + 1); + readRecords(records1stBlock + 1); + checkBlockIndex(records1stBlock - 1, 0); + checkBlockIndex(records1stBlock, 1); + } + + @Test + public void testTwoBlocks() throws IOException { + if (skip) + return; + writeRecords(records1stBlock + 5); + readRecords(records1stBlock + 5); + checkBlockIndex(records1stBlock + 4, 1); + } + + @Test + public void testThreeBlocks() throws IOException { + if (skip) + return; + writeRecords(2 * records1stBlock + 5); + readRecords(2 * records1stBlock + 5); + + checkBlockIndex(2 * records1stBlock + 4, 2); + // 1st key in file + readValueBeforeKey(0); + readKeyWithoutValue(0); + readValueWithoutKey(0); + readKeyManyTimes(0); + // last key in file + readValueBeforeKey(2 * records1stBlock + 4); + readKeyWithoutValue(2 * records1stBlock + 4); + readValueWithoutKey(2 * records1stBlock + 4); + readKeyManyTimes(2 * records1stBlock + 4); + + // 1st key in mid block, verify block indexes then read + checkBlockIndex(records1stBlock - 1, 0); + checkBlockIndex(records1stBlock, 1); + readValueBeforeKey(records1stBlock); + readKeyWithoutValue(records1stBlock); + readValueWithoutKey(records1stBlock); + readKeyManyTimes(records1stBlock); + + // last key in mid block, verify block indexes then read + checkBlockIndex(records1stBlock + records2ndBlock + - 1, 1); + checkBlockIndex(records1stBlock + records2ndBlock, 2); + readValueBeforeKey(records1stBlock + + records2ndBlock - 1); + readKeyWithoutValue(records1stBlock + + records2ndBlock - 1); + readValueWithoutKey(records1stBlock + + records2ndBlock - 1); + readKeyManyTimes(records1stBlock + records2ndBlock + - 1); + + // mid in mid block + readValueBeforeKey(records1stBlock + 10); + readKeyWithoutValue(records1stBlock + 10); + readValueWithoutKey(records1stBlock + 10); + readKeyManyTimes(records1stBlock + 10); + } + + Location locate(Scanner scanner, byte[] key) throws IOException { + if (scanner.seekTo(key) == true) { + return scanner.currentLocation; + } + return scanner.endLocation; + } + + @Test + public void testLocate() throws IOException { + if (skip) + return; + writeRecords(3 * records1stBlock); + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = reader.createScanner(); + locate(scanner, composeSortedKey(KEY, 2).getBytes()); + locate(scanner, composeSortedKey(KEY, records1stBlock - 1).getBytes()); + locate(scanner, composeSortedKey(KEY, records1stBlock).getBytes()); + Location locX = locate(scanner, "keyX".getBytes()); + Assert.assertEquals(scanner.endLocation, locX); + scanner.close(); + reader.close(); + } + + @Test + public void testFailureWriterNotClosed() throws IOException { + if (skip) + return; + Reader reader = null; + try { + reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Assert.fail("Cannot read before closing the writer."); + } catch (IOException e) { + // noop, expecting exceptions + } finally { + if (reader != null) { + reader.close(); + } + } + } + + @Test + public void testFailureWriteMetaBlocksWithSameName() throws IOException { + if (skip) + return; + writer.append("keyX".getBytes(), "valueX".getBytes()); + + // create a new metablock + DataOutputStream outMeta = + writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName()); + outMeta.write(123); + outMeta.write("foo".getBytes()); + outMeta.close(); + // add the same metablock + try { + writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName()); + Assert.fail("Cannot create metablocks with the same name."); + } catch (Exception e) { + // noop, expecting exceptions + } + closeOutput(); + } + + @Test + public void testFailureGetNonExistentMetaBlock() throws IOException { + if (skip) + return; + writer.append("keyX".getBytes(), "valueX".getBytes()); + + // create a new metablock + DataOutputStream outMeta = + writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName()); + outMeta.write(123); + outMeta.write("foo".getBytes()); + outMeta.close(); + closeOutput(); + + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + DataInputStream mb = reader.getMetaBlock("testX"); + Assert.assertNotNull(mb); + mb.close(); + try { + DataInputStream mbBad = reader.getMetaBlock("testY"); + Assert.fail("Error on handling non-existent metablocks."); + } catch (Exception e) { + // noop, expecting exceptions + } + reader.close(); + } + + @Test + public void testFailureWriteRecordAfterMetaBlock() throws IOException { + if (skip) + return; + // write a key/value first + writer.append("keyX".getBytes(), "valueX".getBytes()); + // create a new metablock + DataOutputStream outMeta = + writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName()); + outMeta.write(123); + outMeta.write("dummy".getBytes()); + outMeta.close(); + // add more key/value + try { + writer.append("keyY".getBytes(), "valueY".getBytes()); + Assert.fail("Cannot add key/value after start adding meta blocks."); + } catch (Exception e) { + // noop, expecting exceptions + } + closeOutput(); + } + + @Test + public void testFailureReadValueManyTimes() throws IOException { + if (skip) + return; + writeRecords(5); + + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = reader.createScanner(); + + byte[] vbuf = new byte[BUF_SIZE]; + int vlen = scanner.entry().getValueLength(); + scanner.entry().getValue(vbuf); + Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + 0); + try { + scanner.entry().getValue(vbuf); + Assert.fail("Cannot get the value mlutiple times."); + } catch (Exception e) { + // noop, expecting exceptions + } + + scanner.close(); + reader.close(); + } + + @Test + public void testFailureBadCompressionCodec() throws IOException { + if (skip) + return; + closeOutput(); + out = fs.create(path); + try { + writer = new Writer(out, BLOCK_SIZE, "BAD", comparator, conf); + Assert.fail("Error on handling invalid compression codecs."); + } catch (Exception e) { + // noop, expecting exceptions + // e.printStackTrace(); + } + } + + @Test + public void testFailureOpenEmptyFile() throws IOException { + if (skip) + return; + closeOutput(); + // create an absolutely empty file + path = new Path(fs.getWorkingDirectory(), outputFile); + out = fs.create(path); + out.close(); + try { + new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Assert.fail("Error on handling empty files."); + } catch (EOFException e) { + // noop, expecting exceptions + } + } + + @Test + public void testFailureOpenRandomFile() throws IOException { + if (skip) + return; + closeOutput(); + // create an random file + path = new Path(fs.getWorkingDirectory(), outputFile); + out = fs.create(path); + Random rand = new Random(); + byte[] buf = new byte[K]; + // fill with > 1MB data + for (int nx = 0; nx < K + 2; nx++) { + rand.nextBytes(buf); + out.write(buf); + } + out.close(); + try { + new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Assert.fail("Error on handling random files."); + } catch (IOException e) { + // noop, expecting exceptions + } + } + + @Test + public void testFailureKeyLongerThan64K() throws IOException { + if (skip) + return; + byte[] buf = new byte[64 * K + 1]; + Random rand = new Random(); + rand.nextBytes(buf); + try { + writer.append(buf, "valueX".getBytes()); + } catch (IndexOutOfBoundsException e) { + // noop, expecting exceptions + } + closeOutput(); + } + + @Test + public void testFailureOutOfOrderKeys() throws IOException { + if (skip) + return; + try { + writer.append("keyM".getBytes(), "valueM".getBytes()); + writer.append("keyA".getBytes(), "valueA".getBytes()); + Assert.fail("Error on handling out of order keys."); + } catch (Exception e) { + // noop, expecting exceptions + // e.printStackTrace(); + } + + closeOutput(); + } + + @Test + public void testFailureNegativeOffset() throws IOException { + if (skip) + return; + try { + writer.append("keyX".getBytes(), -1, 4, "valueX".getBytes(), 0, 6); + Assert.fail("Error on handling negative offset."); + } catch (Exception e) { + // noop, expecting exceptions + } + closeOutput(); + } + + @Test + public void testFailureNegativeOffset_2() throws IOException { + if (skip) + return; + closeOutput(); + + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = reader.createScanner(); + try { + scanner.lowerBound("keyX".getBytes(), -1, 4); + Assert.fail("Error on handling negative offset."); + } catch (Exception e) { + // noop, expecting exceptions + } finally { + reader.close(); + scanner.close(); + } + closeOutput(); + } + + @Test + public void testFailureNegativeLength() throws IOException { + if (skip) + return; + try { + writer.append("keyX".getBytes(), 0, -1, "valueX".getBytes(), 0, 6); + Assert.fail("Error on handling negative length."); + } catch (Exception e) { + // noop, expecting exceptions + } + closeOutput(); + } + + @Test + public void testFailureNegativeLength_2() throws IOException { + if (skip) + return; + closeOutput(); + + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = reader.createScanner(); + try { + scanner.lowerBound("keyX".getBytes(), 0, -1); + Assert.fail("Error on handling negative length."); + } catch (Exception e) { + // noop, expecting exceptions + } finally { + scanner.close(); + reader.close(); + } + closeOutput(); + } + + @Test + public void testFailureNegativeLength_3() throws IOException { + if (skip) + return; + writeRecords(3); + + Reader reader = + new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = reader.createScanner(); + try { + // test negative array offset + try { + scanner.seekTo("keyY".getBytes(), -1, 4); + Assert.fail("Failed to handle negative offset."); + } catch (Exception e) { + // noop, expecting exceptions + } + + // test negative array length + try { + scanner.seekTo("keyY".getBytes(), 0, -2); + Assert.fail("Failed to handle negative key length."); + } catch (Exception e) { + // noop, expecting exceptions + } + } finally { + reader.close(); + scanner.close(); + } + } + + @Test + public void testFailureCompressionNotWorking() throws IOException { + if (skip) + return; + long rawDataSize = writeRecords(10 * records1stBlock, false); + if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) { + Assert.assertTrue(out.getPos() < rawDataSize); + } + closeOutput(); + } + + @Test + public void testFailureFileWriteNotAt0Position() throws IOException { + if (skip) + return; + closeOutput(); + out = fs.create(path); + out.write(123); + + try { + writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf); + Assert.fail("Failed to catch file write not at position 0."); + } catch (Exception e) { + // noop, expecting exceptions + } + closeOutput(); + } + + private long writeRecords(int count) throws IOException { + return writeRecords(count, true); + } + + private long writeRecords(int count, boolean close) throws IOException { + long rawDataSize = writeRecords(writer, count); + if (close) { + closeOutput(); + } + return rawDataSize; + } + + static long writeRecords(Writer writer, int count) throws IOException { + long rawDataSize = 0; + int nx; + for (nx = 0; nx < count; nx++) { + byte[] key = composeSortedKey(KEY, nx).getBytes(); + byte[] value = (VALUE + nx).getBytes(); + writer.append(key, value); + rawDataSize += + WritableUtils.getVIntSize(key.length) + key.length + + WritableUtils.getVIntSize(value.length) + value.length; + } + return rawDataSize; + } + + /** + * Insert some leading 0's in front of the value, to make the keys sorted. + * + * @param prefix prefix + * @param value value + * @return sorted key + */ + static String composeSortedKey(String prefix, int value) { + return String.format("%s%010d", prefix, value); + } + + private void readRecords(int count) throws IOException { + readRecords(fs, path, count, conf); + } + + static void readRecords(FileSystem fs, Path path, int count, + Configuration conf) throws IOException { + Reader reader = + new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = reader.createScanner(); + + try { + for (int nx = 0; nx < count; nx++, scanner.advance()) { + Assert.assertFalse(scanner.atEnd()); + // Assert.assertTrue(scanner.next()); + + byte[] kbuf = new byte[BUF_SIZE]; + int klen = scanner.entry().getKeyLength(); + scanner.entry().getKey(kbuf); + Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY, + nx)); + + byte[] vbuf = new byte[BUF_SIZE]; + int vlen = scanner.entry().getValueLength(); + scanner.entry().getValue(vbuf); + Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + nx); + } + + Assert.assertTrue(scanner.atEnd()); + Assert.assertFalse(scanner.advance()); + } finally { + scanner.close(); + reader.close(); + } + } + + private void checkBlockIndex(int recordIndex, int blockIndexExpected) throws IOException { + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = reader.createScanner(); + scanner.seekTo(composeSortedKey(KEY, recordIndex).getBytes()); + Assert.assertEquals(blockIndexExpected, scanner.currentLocation + .getBlockIndex()); + scanner.close(); + reader.close(); + } + + private void readValueBeforeKey(int recordIndex) + throws IOException { + Reader reader = + new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = + reader.createScannerByKey(composeSortedKey(KEY, recordIndex) + .getBytes(), null); + + try { + byte[] vbuf = new byte[BUF_SIZE]; + int vlen = scanner.entry().getValueLength(); + scanner.entry().getValue(vbuf); + Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + recordIndex); + + byte[] kbuf = new byte[BUF_SIZE]; + int klen = scanner.entry().getKeyLength(); + scanner.entry().getKey(kbuf); + Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY, + recordIndex)); + } finally { + scanner.close(); + reader.close(); + } + } + + private void readKeyWithoutValue(int recordIndex) + throws IOException { + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + Scanner scanner = + reader.createScannerByKey(composeSortedKey(KEY, recordIndex) + .getBytes(), null); + + try { + // read the indexed key + byte[] kbuf1 = new byte[BUF_SIZE]; + int klen1 = scanner.entry().getKeyLength(); + scanner.entry().getKey(kbuf1); + Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY, + recordIndex)); + + if (scanner.advance() && !scanner.atEnd()) { + // read the next key following the indexed + byte[] kbuf2 = new byte[BUF_SIZE]; + int klen2 = scanner.entry().getKeyLength(); + scanner.entry().getKey(kbuf2); + Assert.assertEquals(new String(kbuf2, 0, klen2), composeSortedKey(KEY, + recordIndex + 1)); + } + } finally { + scanner.close(); + reader.close(); + } + } + + private void readValueWithoutKey(int recordIndex) + throws IOException { + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + + Scanner scanner = + reader.createScannerByKey(composeSortedKey(KEY, recordIndex) + .getBytes(), null); + + byte[] vbuf1 = new byte[BUF_SIZE]; + int vlen1 = scanner.entry().getValueLength(); + scanner.entry().getValue(vbuf1); + Assert.assertEquals(new String(vbuf1, 0, vlen1), VALUE + recordIndex); + + if (scanner.advance() && !scanner.atEnd()) { + byte[] vbuf2 = new byte[BUF_SIZE]; + int vlen2 = scanner.entry().getValueLength(); + scanner.entry().getValue(vbuf2); + Assert.assertEquals(new String(vbuf2, 0, vlen2), VALUE + + (recordIndex + 1)); + } + + scanner.close(); + reader.close(); + } + + private void readKeyManyTimes(int recordIndex) throws IOException { + Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + + Scanner scanner = + reader.createScannerByKey(composeSortedKey(KEY, recordIndex) + .getBytes(), null); + + // read the indexed key + byte[] kbuf1 = new byte[BUF_SIZE]; + int klen1 = scanner.entry().getKeyLength(); + scanner.entry().getKey(kbuf1); + Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY, + recordIndex)); + + klen1 = scanner.entry().getKeyLength(); + scanner.entry().getKey(kbuf1); + Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY, + recordIndex)); + + klen1 = scanner.entry().getKeyLength(); + scanner.entry().getKey(kbuf1); + Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY, + recordIndex)); + + scanner.close(); + reader.close(); + } + + private void closeOutput() throws IOException { + if (writer != null) { + writer.close(); + writer = null; + } + if (out != null) { + out.close(); + out = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java new file mode 100644 index 0000000..c313813 --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparator2.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.file.tfile.DTFile.Writer; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestTFileComparator2 { + private static final String ROOT = System.getProperty("test.build.data", + "/tmp/tfile-test"); + private static final String name = "test-tfile-comparator2"; + private final static int BLOCK_SIZE = 512; + private static final String VALUE = "value"; + private static final String jClassLongWritableComparator = "jclass:" + + LongWritable.Comparator.class.getName(); + private static final long NENTRY = 10000; + + private static long cube(long n) { + return n*n*n; + } + + private static String buildValue(long i) { + return String.format("%s-%d", VALUE, i); + } + + @Test + public void testSortedLongWritable() throws IOException { + Configuration conf = new Configuration(); + Path path = new Path(ROOT, name); + FileSystem fs = path.getFileSystem(conf); + FSDataOutputStream out = fs.create(path); + try { + DTFile.Writer writer = new Writer(out, BLOCK_SIZE, "gz", + jClassLongWritableComparator, conf); + try { + LongWritable key = new LongWritable(0); + for (long i=0; i<NENTRY; ++i) { + key.set(cube(i-NENTRY/2)); + DataOutputStream dos = writer.prepareAppendKey(-1); + try { + key.write(dos); + } finally { + dos.close(); + } + dos = writer.prepareAppendValue(-1); + try { + dos.write(buildValue(i).getBytes()); + } finally { + dos.close(); + } + } + } finally { + writer.close(); + } + } finally { + out.close(); + } + + FSDataInputStream in = fs.open(path); + try { + DTFile.Reader reader = new DTFile.Reader(in, fs.getFileStatus(path) + .getLen(), conf); + try { + DTFile.Reader.Scanner scanner = reader.createScanner(); + long i=0; + BytesWritable value = new BytesWritable(); + for (; !scanner.atEnd(); scanner.advance()) { + scanner.entry().getValue(value); + assertEquals(buildValue(i), new String(value.getBytes(), 0, value + .getLength())); + ++i; + } + } finally { + reader.close(); + } + } finally { + in.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java new file mode 100644 index 0000000..0a10468 --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileComparators.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.file.tfile.DTFile.Writer; +import org.junit.Assert; + +/** + * + * Byte arrays test case class using GZ compression codec, base class of none + * and LZO compression classes. + * + */ +public class TestTFileComparators extends TestCase { + private static String ROOT = + System.getProperty("test.build.data", "/tmp/tfile-test"); + + private final static int BLOCK_SIZE = 512; + private FileSystem fs; + private Configuration conf; + private Path path; + private FSDataOutputStream out; + private Writer writer; + + private String compression = Compression.Algorithm.GZ.getName(); + private String outputFile = "TFileTestComparators"; + /* + * pre-sampled numbers of records in one block, based on the given the + * generated key and value strings + */ + // private int records1stBlock = 4314; + // private int records2ndBlock = 4108; + private int records1stBlock = 4480; + private int records2ndBlock = 4263; + + @Override + public void setUp() throws IOException { + conf = new Configuration(); + path = new Path(ROOT, outputFile); + fs = path.getFileSystem(conf); + out = fs.create(path); + } + + @Override + public void tearDown() throws IOException { + fs.delete(path, true); + } + + // bad comparator format + public void testFailureBadComparatorNames() throws IOException { + try { + writer = new Writer(out, BLOCK_SIZE, compression, "badcmp", conf); + Assert.fail("Failed to catch unsupported comparator names"); + } + catch (Exception e) { + // noop, expecting exceptions + e.printStackTrace(); + } + } + + // jclass that doesn't exist + public void testFailureBadJClassNames() throws IOException { + try { + writer = + new Writer(out, BLOCK_SIZE, compression, + "jclass: some.non.existence.clazz", conf); + Assert.fail("Failed to catch unsupported comparator names"); + } + catch (Exception e) { + // noop, expecting exceptions + e.printStackTrace(); + } + } + + // class exists but not a RawComparator + public void testFailureBadJClasses() throws IOException { + try { + writer = + new Writer(out, BLOCK_SIZE, compression, + "jclass:org.apache.hadoop.io.file.tfile.Chunk", conf); + Assert.fail("Failed to catch unsupported comparator names"); + } + catch (Exception e) { + // noop, expecting exceptions + e.printStackTrace(); + } + } + + private void closeOutput() throws IOException { + if (writer != null) { + writer.close(); + writer = null; + } + if (out != null) { + out.close(); + out = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java new file mode 100644 index 0000000..301cffc --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableComparator; + +/** + * + * Byte arrays test case class using GZ compression codec, base class of none + * and LZO compression classes. + * + */ + +public class TestTFileJClassComparatorByteArrays extends TestDTFileByteArrays { + /** + * Test non-compression codec, using the same test cases as in the ByteArrays. + */ + @Override + public void setUp() throws IOException { + init(Compression.Algorithm.GZ.getName(), + "jclass: org.apache.hadoop.io.file.tfile.MyComparator"); + super.setUp(); + } +} + +class MyComparator implements RawComparator<byte[]>, Serializable { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2); + } + + @Override + public int compare(byte[] o1, byte[] o2) { + return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length); + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java new file mode 100644 index 0000000..20cff9e --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.IOException; + +import org.apache.hadoop.io.file.tfile.Compression.Algorithm; + +public class TestTFileLzoCodecsByteArrays extends TestDTFileByteArrays { + /** + * Test LZO compression codec, using the same test cases as in the ByteArrays. + */ + @Override + public void setUp() throws IOException { + skip = !(Algorithm.LZO.isSupported()); + if (skip) { + System.out.println("Skipped"); + } + + // TODO: sample the generated key/value records, and put the numbers below + init(Compression.Algorithm.LZO.getName(), "memcmp", 2605, 2558); + if (!skip) + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java new file mode 100644 index 0000000..7c6581d --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.IOException; + +import org.apache.hadoop.io.file.tfile.Compression.Algorithm; + +public class TestTFileLzoCodecsStreams extends TestTFileStreams { + /** + * Test LZO compression codec, using the same test cases as in the ByteArrays. + */ + @Override + public void setUp() throws IOException { + skip = !(Algorithm.LZO.isSupported()); + if (skip) { + System.out.println("Skipped"); + } + init(Compression.Algorithm.LZO.getName(), "memcmp"); + if (!skip) + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java new file mode 100644 index 0000000..c304743 --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.IOException; + +public class TestTFileNoneCodecsByteArrays extends TestDTFileByteArrays { + /** + * Test non-compression codec, using the same test cases as in the ByteArrays. + */ + @Override + public void setUp() throws IOException { + init(Compression.Algorithm.NONE.getName(), "memcmp", 24, 24); + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java new file mode 100644 index 0000000..31e3cad --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsJClassComparatorByteArrays.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.IOException; + +/** + * + * Byte arrays test case class using GZ compression codec, base class of none + * and LZO compression classes. + * + */ + +public class TestTFileNoneCodecsJClassComparatorByteArrays extends TestDTFileByteArrays { + /** + * Test non-compression codec, using the same test cases as in the ByteArrays. + */ + @Override + public void setUp() throws IOException { + init(Compression.Algorithm.NONE.getName(), + "jclass: org.apache.hadoop.io.file.tfile.MyComparator", 24, 24); + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java new file mode 100644 index 0000000..06d086b --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsStreams.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.IOException; + +public class TestTFileNoneCodecsStreams extends TestTFileStreams { + /** + * Test non-compression codec, using the same test cases as in the ByteArrays. + */ + @Override + public void setUp() throws IOException { + init(Compression.Algorithm.NONE.getName(), "memcmp"); + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/02f48e1b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java new file mode 100644 index 0000000..9f6b3ce --- /dev/null +++ b/library/src/test/java/org/apache/hadoop/io/file/tfile/TestTFileSeek.java @@ -0,0 +1,505 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.io.file.tfile; + +import java.io.IOException; +import java.util.Random; +import java.util.StringTokenizer; + +import junit.framework.TestCase; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG; +import org.apache.hadoop.io.file.tfile.DTFile.Reader; +import org.apache.hadoop.io.file.tfile.DTFile.Writer; +import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; + +/** + * test the performance for seek. + * + */ +public class TestTFileSeek extends TestCase { + private MyOptions options; + private Configuration conf; + private Path path; + private FileSystem fs; + private NanoTimer timer; + private Random rng; + private DiscreteRNG keyLenGen; + private KVGenerator kvGen; + + @Override + public void setUp() throws IOException { + if (options == null) { + options = new MyOptions(new String[0]); + } + + conf = new Configuration(); + conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize); + conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize); + path = new Path(new Path(options.rootDir), options.file); + fs = path.getFileSystem(conf); + timer = new NanoTimer(false); + rng = new Random(options.seed); + keyLenGen = + new RandomDistribution.Zipf(new Random(rng.nextLong()), + options.minKeyLen, options.maxKeyLen, 1.2); + DiscreteRNG valLenGen = + new RandomDistribution.Flat(new Random(rng.nextLong()), + options.minValLength, options.maxValLength); + DiscreteRNG wordLenGen = + new RandomDistribution.Flat(new Random(rng.nextLong()), + options.minWordLen, options.maxWordLen); + kvGen = + new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen, + options.dictSize); + } + + @Override + public void tearDown() throws IOException { + fs.delete(path, true); + } + + private static FSDataOutputStream createFSOutput(Path name, FileSystem fs) + throws IOException { + if (fs.exists(name)) { + fs.delete(name, true); + } + FSDataOutputStream fout = fs.create(name); + return fout; + } + + private void createTFile() throws IOException { + long totalBytes = 0; + FSDataOutputStream fout = createFSOutput(path, fs); + try { + Writer writer = + new Writer(fout, options.minBlockSize, options.compress, "memcmp", + conf); + try { + BytesWritable key = new BytesWritable(); + BytesWritable val = new BytesWritable(); + timer.start(); + for (long i = 0; true; ++i) { + if (i % 1000 == 0) { // test the size for every 1000 rows. + if (fs.getFileStatus(path).getLen() >= options.fileSize) { + break; + } + } + kvGen.next(key, val, false); + writer.append(key.get(), 0, key.getSize(), val.get(), 0, val + .getSize()); + totalBytes += key.getSize(); + totalBytes += val.getSize(); + } + timer.stop(); + } + finally { + writer.close(); + } + } + finally { + fout.close(); + } + double duration = (double)timer.read()/1000; // in us. + long fsize = fs.getFileStatus(path).getLen(); + + System.out.printf( + "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n", + timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes + / duration); + System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n", + timer.toString(), (double) fsize / 1024 / 1024, fsize / duration); + } + + public void seekTFile() throws IOException { + int miss = 0; + long totalBytes = 0; + FSDataInputStream fsdis = fs.open(path); + Reader reader = + new Reader(fsdis, fs.getFileStatus(path).getLen(), conf); + KeySampler kSampler = + new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(), + keyLenGen); + Scanner scanner = reader.createScanner(); + BytesWritable key = new BytesWritable(); + BytesWritable val = new BytesWritable(); + timer.reset(); + timer.start(); + for (int i = 0; i < options.seekCount; ++i) { + kSampler.next(key); + scanner.lowerBound(key.get(), 0, key.getSize()); + if (!scanner.atEnd()) { + scanner.entry().get(key, val); + totalBytes += key.getSize(); + totalBytes += val.getSize(); + } + else { + ++miss; + } + } + timer.stop(); + double duration = (double) timer.read() / 1000; // in us. + System.out.printf( + "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n", + timer.toString(), NanoTimer.nanoTimeToString(timer.read() + / options.seekCount), options.seekCount - miss, miss, + (double) totalBytes / 1024 / (options.seekCount - miss)); + + } + + public void testSeeks() throws IOException { + String[] supported = TFile.getSupportedCompressionAlgorithms(); + boolean proceed = false; + for (String c : supported) { + if (c.equals(options.compress)) { + proceed = true; + break; + } + } + + if (!proceed) { + System.out.println("Skipped for " + options.compress); + return; + } + + if (options.doCreate()) { + createTFile(); + } + + if (options.doRead()) { + seekTFile(); + } + } + + private static class IntegerRange { + private final int from, to; + + public IntegerRange(int from, int to) { + this.from = from; + this.to = to; + } + + public static IntegerRange parse(String s) throws ParseException { + StringTokenizer st = new StringTokenizer(s, " \t,"); + if (st.countTokens() != 2) { + throw new ParseException("Bad integer specification: " + s); + } + int from = Integer.parseInt(st.nextToken()); + int to = Integer.parseInt(st.nextToken()); + return new IntegerRange(from, to); + } + + public int from() { + return from; + } + + public int to() { + return to; + } + } + + private static class MyOptions { + // hard coded constants + int dictSize = 1000; + int minWordLen = 5; + int maxWordLen = 20; + int osInputBufferSize = 64 * 1024; + int osOutputBufferSize = 64 * 1024; + int fsInputBufferSizeNone = 0; + int fsInputBufferSizeLzo = 0; + int fsInputBufferSizeGz = 0; + int fsOutputBufferSizeNone = 1; + int fsOutputBufferSizeLzo = 1; + int fsOutputBufferSizeGz = 1; + + String rootDir = + System.getProperty("test.build.data", "/tmp/tfile-test"); + String file = "TestTFileSeek"; + String compress = "gz"; + int minKeyLen = 10; + int maxKeyLen = 50; + int minValLength = 100; + int maxValLength = 200; + int minBlockSize = 64 * 1024; + int fsOutputBufferSize = 1; + int fsInputBufferSize = 0; + long fileSize = 3 * 1024 * 1024; + long seekCount = 1000; + long seed; + + static final int OP_CREATE = 1; + static final int OP_READ = 2; + int op = OP_CREATE | OP_READ; + + boolean proceed = false; + + public MyOptions(String[] args) { + seed = System.nanoTime(); + + try { + Options opts = buildOptions(); + CommandLineParser parser = new GnuParser(); + CommandLine line = parser.parse(opts, args, true); + processOptions(line, opts); + validateOptions(); + } + catch (ParseException e) { + System.out.println(e.getMessage()); + System.out.println("Try \"--help\" option for details."); + setStopProceed(); + } + } + + public boolean proceed() { + return proceed; + } + + private Options buildOptions() { + Option compress = + OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz]") + .hasArg().withDescription("compression scheme").create('c'); + + Option fileSize = + OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB") + .hasArg().withDescription("target size of the file (in MB).") + .create('s'); + + Option fsInputBufferSz = + OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size") + .hasArg().withDescription( + "size of the file system input buffer (in bytes).").create( + 'i'); + + Option fsOutputBufferSize = + OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size") + .hasArg().withDescription( + "size of the file system output buffer (in bytes).").create( + 'o'); + + Option keyLen = + OptionBuilder + .withLongOpt("key-length") + .withArgName("min,max") + .hasArg() + .withDescription( + "the length range of the key (in bytes)") + .create('k'); + + Option valueLen = + OptionBuilder + .withLongOpt("value-length") + .withArgName("min,max") + .hasArg() + .withDescription( + "the length range of the value (in bytes)") + .create('v'); + + Option blockSz = + OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg() + .withDescription("minimum block size (in KB)").create('b'); + + Option seed = + OptionBuilder.withLongOpt("seed").withArgName("long-int").hasArg() + .withDescription("specify the seed").create('S'); + + Option operation = + OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg() + .withDescription( + "action: seek-only, create-only, seek-after-create").create( + 'x'); + + Option rootDir = + OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg() + .withDescription( + "specify root directory where files will be created.") + .create('r'); + + Option file = + OptionBuilder.withLongOpt("file").withArgName("name").hasArg() + .withDescription("specify the file name to be created or read.") + .create('f'); + + Option seekCount = + OptionBuilder + .withLongOpt("seek") + .withArgName("count") + .hasArg() + .withDescription( + "specify how many seek operations we perform (requires -x r or -x rw.") + .create('n'); + + Option help = + OptionBuilder.withLongOpt("help").hasArg(false).withDescription( + "show this screen").create("h"); + + return new Options().addOption(compress).addOption(fileSize).addOption( + fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen) + .addOption(blockSz).addOption(rootDir).addOption(valueLen).addOption( + operation).addOption(seekCount).addOption(file).addOption(help); + + } + + private void processOptions(CommandLine line, Options opts) + throws ParseException { + // --help -h and --version -V must be processed first. + if (line.hasOption('h')) { + HelpFormatter formatter = new HelpFormatter(); + System.out.println("TFile and SeqFile benchmark."); + System.out.println(); + formatter.printHelp(100, + "java ... TestTFileSeqFileComparison [options]", + "\nSupported options:", opts, ""); + return; + } + + if (line.hasOption('c')) { + compress = line.getOptionValue('c'); + } + + if (line.hasOption('d')) { + dictSize = Integer.parseInt(line.getOptionValue('d')); + } + + if (line.hasOption('s')) { + fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024; + } + + if (line.hasOption('i')) { + fsInputBufferSize = Integer.parseInt(line.getOptionValue('i')); + } + + if (line.hasOption('o')) { + fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o')); + } + + if (line.hasOption('n')) { + seekCount = Integer.parseInt(line.getOptionValue('n')); + } + + if (line.hasOption('k')) { + IntegerRange ir = IntegerRange.parse(line.getOptionValue('k')); + minKeyLen = ir.from(); + maxKeyLen = ir.to(); + } + + if (line.hasOption('v')) { + IntegerRange ir = IntegerRange.parse(line.getOptionValue('v')); + minValLength = ir.from(); + maxValLength = ir.to(); + } + + if (line.hasOption('b')) { + minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024; + } + + if (line.hasOption('r')) { + rootDir = line.getOptionValue('r'); + } + + if (line.hasOption('f')) { + file = line.getOptionValue('f'); + } + + if (line.hasOption('S')) { + seed = Long.parseLong(line.getOptionValue('S')); + } + + if (line.hasOption('x')) { + String strOp = line.getOptionValue('x'); + if (strOp.equals("r")) { + op = OP_READ; + } + else if (strOp.equals("w")) { + op = OP_CREATE; + } + else if (strOp.equals("rw")) { + op = OP_CREATE | OP_READ; + } + else { + throw new ParseException("Unknown action specifier: " + strOp); + } + } + + proceed = true; + } + + private void validateOptions() throws ParseException { + if (!compress.equals("none") && !compress.equals("lzo") + && !compress.equals("gz")) { + throw new ParseException("Unknown compression scheme: " + compress); + } + + if (minKeyLen >= maxKeyLen) { + throw new ParseException( + "Max key length must be greater than min key length."); + } + + if (minValLength >= maxValLength) { + throw new ParseException( + "Max value length must be greater than min value length."); + } + + if (minWordLen >= maxWordLen) { + throw new ParseException( + "Max word length must be greater than min word length."); + } + return; + } + + private void setStopProceed() { + proceed = false; + } + + public boolean doCreate() { + return (op & OP_CREATE) != 0; + } + + public boolean doRead() { + return (op & OP_READ) != 0; + } + } + + public static void main(String[] argv) throws IOException { + TestTFileSeek testCase = new TestTFileSeek(); + MyOptions options = new MyOptions(argv); + + if (options.proceed == false) { + return; + } + + testCase.options = options; + testCase.setUp(); + testCase.testSeeks(); + testCase.tearDown(); + } +}
