http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java deleted file mode 100644 index c3d4992..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.*; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -import static org.junit.Assert.*; - -public class TestStorageManager { - private TajoConf conf; - private static String TEST_PATH = "target/test-data/TestStorageManager"; - StorageManager sm = null; - private Path testDir; - private FileSystem fs; - - @Before - public void setUp() throws Exception { - conf = new TajoConf(); - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - sm = StorageManager.getStorageManager(conf, testDir); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public final void testGetScannerAndAppender() throws IOException { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age",Type.INT4); - schema.addColumn("name",Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - Tuple[] tuples = new Tuple[4]; - for(int i=0; i < tuples.length; i++) { - tuples[i] = new VTuple(3); - tuples[i].put(new Datum[] { - DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 32), - DatumFactory.createText("name" + i)}); - } - - Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); - fs.mkdirs(path.getParent()); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, path); - appender.init(); - for(Tuple t : tuples) { - appender.addTuple(t); - } - appender.close(); - - Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(meta, schema, path); - scanner.init(); - int i=0; - while(scanner.next() != null) { - i++; - } - assertEquals(4,i); - } - - @Test - public void testGetSplit() throws Exception { - final Configuration conf = new HdfsConfiguration(); - String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); - conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); - - int testCount = 10; - Path tablePath = new Path("/testGetSplit"); - try { - DistributedFileSystem fs = cluster.getFileSystem(); - - // Create test partitions - List<Path> partitions = Lists.newArrayList(); - for (int i =0; i < testCount; i++){ - Path tmpFile = new Path(tablePath, String.valueOf(i)); - DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl); - partitions.add(tmpFile); - } - - assertTrue(fs.exists(tablePath)); - StorageManager sm = StorageManager.getStorageManager(new TajoConf(conf), tablePath); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age",Type.INT4); - schema.addColumn("name",Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - List<FileFragment> splits = Lists.newArrayList(); - // Get FileFragments in partition batch - splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); - assertEquals(testCount, splits.size()); - // -1 is unknown volumeId - assertEquals(-1, splits.get(0).getDiskIds()[0]); - - splits.clear(); - splits.addAll(sm.getSplits("data", meta, schema, - partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2]))); - assertEquals(testCount / 2, splits.size()); - assertEquals(1, splits.get(0).getHosts().length); - assertEquals(-1, splits.get(0).getDiskIds()[0]); - fs.close(); - } finally { - cluster.shutdown(); - - File dir = new File(testDataPath); - dir.delete(); - } - } - - @Test - public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { - final Configuration conf = new HdfsConfiguration(); - String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); - conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(2).build(); - - int testCount = 10; - Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching"); - try { - DistributedFileSystem fs = cluster.getFileSystem(); - - // Create test files - for (int i = 0; i < testCount; i++) { - Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat"); - DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); - } - assertTrue(fs.exists(tablePath)); - StorageManager sm = StorageManager.getStorageManager(new TajoConf(conf), tablePath); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT4); - schema.addColumn("name", Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - List<FileFragment> splits = Lists.newArrayList(); - splits.addAll(sm.getSplits("data", meta, schema, tablePath)); - - assertEquals(testCount, splits.size()); - assertEquals(2, splits.get(0).getHosts().length); - assertEquals(2, splits.get(0).getDiskIds().length); - assertNotEquals(-1, splits.get(0).getDiskIds()[0]); - fs.close(); - } finally { - cluster.shutdown(); - - File dir = new File(testDataPath); - dir.delete(); - } - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java deleted file mode 100644 index bd1a1f9..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ /dev/null @@ -1,868 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.tajo.QueryId; -import org.apache.tajo.TajoIdProtos; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.ProtobufDatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.rcfile.RCFile; -import org.apache.tajo.storage.sequencefile.SequenceFileScanner; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.KeyValueSet; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@RunWith(Parameterized.class) -public class TestStorages { - private TajoConf conf; - private static String TEST_PATH = "target/test-data/TestStorages"; - - private static String TEST_PROJECTION_AVRO_SCHEMA = - "{\n" + - " \"type\": \"record\",\n" + - " \"namespace\": \"org.apache.tajo\",\n" + - " \"name\": \"testProjection\",\n" + - " \"fields\": [\n" + - " { \"name\": \"id\", \"type\": \"int\" },\n" + - " { \"name\": \"age\", \"type\": \"long\" },\n" + - " { \"name\": \"score\", \"type\": \"float\" }\n" + - " ]\n" + - "}\n"; - - private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA = - "{\n" + - " \"type\": \"record\",\n" + - " \"namespace\": \"org.apache.tajo\",\n" + - " \"name\": \"testNullHandlingTypes\",\n" + - " \"fields\": [\n" + - " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + - " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" + - " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" + - " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" + - " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" + - " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" + - " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" + - " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + - " { \"name\": \"col11\", \"type\": \"null\" },\n" + - " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" + - " ]\n" + - "}\n"; - - private StoreType storeType; - private boolean splitable; - private boolean statsable; - private boolean seekable; - private Path testDir; - private FileSystem fs; - - public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException { - this.storeType = type; - this.splitable = splitable; - this.statsable = statsable; - this.seekable = seekable; - - conf = new TajoConf(); - - if (storeType == StoreType.RCFILE) { - conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100); - } - - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - } - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - //type, splitable, statsable, seekable - {StoreType.CSV, true, true, true}, - {StoreType.RAW, false, true, true}, - {StoreType.RCFILE, true, true, false}, - {StoreType.PARQUET, false, false, false}, - {StoreType.SEQUENCEFILE, true, true, false}, - {StoreType.AVRO, false, false, false}, - {StoreType.TEXTFILE, true, true, false}, - {StoreType.JSON, true, true, false}, - }); - } - - @Test - public void testSplitable() throws IOException { - if (splitable) { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - - TableMeta meta = CatalogUtil.newTableMeta(storeType); - Path tablePath = new Path(testDir, "Splitable.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - appender.init(); - int tupleNum = 10000; - VTuple vTuple; - - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(2); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(25l)); - appender.addTuple(vTuple); - } - appender.close(); - TableStats stat = appender.getStats(); - assertEquals(tupleNum, stat.getNumRows().longValue()); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - long randomNum = (long) (Math.random() * fileLen) + 1; - - FileFragment[] tablets = new FileFragment[2]; - tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); - tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); - - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema); - assertTrue(scanner.isSplittable()); - scanner.init(); - int tupleCnt = 0; - while (scanner.next() != null) { - tupleCnt++; - } - scanner.close(); - - scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema); - assertTrue(scanner.isSplittable()); - scanner.init(); - while (scanner.next() != null) { - tupleCnt++; - } - scanner.close(); - - assertEquals(tupleNum, tupleCnt); - } - } - - @Test - public void testRCFileSplitable() throws IOException { - if (storeType == StoreType.RCFILE) { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - - TableMeta meta = CatalogUtil.newTableMeta(storeType); - Path tablePath = new Path(testDir, "Splitable.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - appender.init(); - int tupleNum = 10000; - VTuple vTuple; - - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(2); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(25l)); - appender.addTuple(vTuple); - } - appender.close(); - TableStats stat = appender.getStats(); - assertEquals(tupleNum, stat.getNumRows().longValue()); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - long randomNum = 122; // header size - - FileFragment[] tablets = new FileFragment[2]; - tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); - tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); - - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema); - assertTrue(scanner.isSplittable()); - scanner.init(); - int tupleCnt = 0; - while (scanner.next() != null) { - tupleCnt++; - } - scanner.close(); - - scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema); - assertTrue(scanner.isSplittable()); - scanner.init(); - while (scanner.next() != null) { - tupleCnt++; - } - scanner.close(); - - assertEquals(tupleNum, tupleCnt); - } - } - - @Test - public void testProjection() throws IOException { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - schema.addColumn("score", Type.FLOAT4); - - TableMeta meta = CatalogUtil.newTableMeta(storeType); - meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); - if (storeType == StoreType.AVRO) { - meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, - TEST_PROJECTION_AVRO_SCHEMA); - } - - Path tablePath = new Path(testDir, "testProjection.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - int tupleNum = 10000; - VTuple vTuple; - - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(3); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(i + 2)); - vTuple.put(2, DatumFactory.createFloat4(i + 3)); - appender.addTuple(vTuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen()); - - Schema target = new Schema(); - target.addColumn("age", Type.INT8); - target.addColumn("score", Type.FLOAT4); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment, target); - scanner.init(); - int tupleCnt = 0; - Tuple tuple; - while ((tuple = scanner.next()) != null) { - if (storeType == StoreType.RCFILE - || storeType == StoreType.CSV - || storeType == StoreType.PARQUET - || storeType == StoreType.SEQUENCEFILE - || storeType == StoreType.AVRO) { - assertTrue(tuple.get(0) == null); - } - assertTrue(tupleCnt + 2 == tuple.get(1).asInt8()); - assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4()); - tupleCnt++; - } - scanner.close(); - - assertEquals(tupleNum, tupleCnt); - } - - @Test - public void testVariousTypes() throws IOException { - boolean handleProtobuf = storeType != StoreType.JSON; - - Schema schema = new Schema(); - schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.CHAR, 7); - schema.addColumn("col3", Type.INT2); - schema.addColumn("col4", Type.INT4); - schema.addColumn("col5", Type.INT8); - schema.addColumn("col6", Type.FLOAT4); - schema.addColumn("col7", Type.FLOAT8); - schema.addColumn("col8", Type.TEXT); - schema.addColumn("col9", Type.BLOB); - schema.addColumn("col10", Type.INET4); - schema.addColumn("col11", Type.NULL_TYPE); - if (handleProtobuf) { - schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); - } - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); - if (storeType == StoreType.AVRO) { - String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString(); - meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); - } - - Path tablePath = new Path(testDir, "testVariousTypes.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - QueryId queryid = new QueryId("12345", 5); - ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - - Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0)); - tuple.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createChar("hyunsik"), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("hyunsik"), - DatumFactory.createBlob("hyunsik".getBytes()), - DatumFactory.createInet4("192.168.0.1"), - NullDatum.get() - }); - if (handleProtobuf) { - tuple.put(11, factory.createDatum(queryid.getProto())); - } - - appender.addTuple(tuple); - appender.flush(); - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - Tuple retrieved; - while ((retrieved = scanner.next()) != null) { - for (int i = 0; i < tuple.size(); i++) { - assertEquals(tuple.get(i), retrieved.get(i)); - } - } - scanner.close(); - } - - @Test - public void testNullHandlingTypes() throws IOException { - boolean handleProtobuf = storeType != StoreType.JSON; - - Schema schema = new Schema(); - schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.CHAR, 7); - schema.addColumn("col3", Type.INT2); - schema.addColumn("col4", Type.INT4); - schema.addColumn("col5", Type.INT8); - schema.addColumn("col6", Type.FLOAT4); - schema.addColumn("col7", Type.FLOAT8); - schema.addColumn("col8", Type.TEXT); - schema.addColumn("col9", Type.BLOB); - schema.addColumn("col10", Type.INET4); - schema.addColumn("col11", Type.NULL_TYPE); - - if (handleProtobuf) { - schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); - } - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); - meta.putOption(StorageConstants.TEXT_NULL, "\\\\N"); - meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N"); - meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); - meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\"); - if (storeType == StoreType.AVRO) { - meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, - TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); - } - - Path tablePath = new Path(testDir, "testVariousTypes.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - QueryId queryid = new QueryId("12345", 5); - ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - int columnNum = 11 + (handleProtobuf ? 1 : 0); - Tuple seedTuple = new VTuple(columnNum); - seedTuple.put(new Datum[]{ - DatumFactory.createBool(true), // 0 - DatumFactory.createChar("hyunsik"), // 2 - DatumFactory.createInt2((short) 17), // 3 - DatumFactory.createInt4(59), // 4 - DatumFactory.createInt8(23l), // 5 - DatumFactory.createFloat4(77.9f), // 6 - DatumFactory.createFloat8(271.9f), // 7 - DatumFactory.createText("hyunsik"), // 8 - DatumFactory.createBlob("hyunsik".getBytes()),// 9 - DatumFactory.createInet4("192.168.0.1"), // 10 - NullDatum.get(), // 11 - }); - - if (handleProtobuf) { - seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12 - } - - // Making tuples with different null column positions - Tuple tuple; - for (int i = 0; i < columnNum; i++) { - tuple = new VTuple(columnNum); - for (int j = 0; j < columnNum; j++) { - if (i == j) { // i'th column will have NULL value - tuple.put(j, NullDatum.get()); - } else { - tuple.put(j, seedTuple.get(j)); - } - } - appender.addTuple(tuple); - } - appender.flush(); - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - Tuple retrieved; - int i = 0; - while ((retrieved = scanner.next()) != null) { - assertEquals(columnNum, retrieved.size()); - for (int j = 0; j < columnNum; j++) { - if (i == j) { - assertEquals(NullDatum.get(), retrieved.get(j)); - } else { - assertEquals(seedTuple.get(j), retrieved.get(j)); - } - } - - i++; - } - scanner.close(); - } - - @Test - public void testRCFileTextSerializeDeserialize() throws IOException { - if(storeType != StoreType.RCFILE) return; - - Schema schema = new Schema(); - schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.BIT); - schema.addColumn("col3", Type.CHAR, 7); - schema.addColumn("col4", Type.INT2); - schema.addColumn("col5", Type.INT4); - schema.addColumn("col6", Type.INT8); - schema.addColumn("col7", Type.FLOAT4); - schema.addColumn("col8", Type.FLOAT8); - schema.addColumn("col9", Type.TEXT); - schema.addColumn("col10", Type.BLOB); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", Type.NULL_TYPE); - schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); - - Path tablePath = new Path(testDir, "testVariousTypes.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - appender.init(); - - QueryId queryid = new QueryId("12345", 5); - ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - - Tuple tuple = new VTuple(13); - tuple.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), - DatumFactory.createChar("jinho"), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("jinho"), - DatumFactory.createBlob("hyunsik babo".getBytes()), - DatumFactory.createInet4("192.168.0.1"), - NullDatum.get(), - factory.createDatum(queryid.getProto()) - }); - appender.addTuple(tuple); - appender.flush(); - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); - - FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - Tuple retrieved; - while ((retrieved=scanner.next()) != null) { - for (int i = 0; i < tuple.size(); i++) { - assertEquals(tuple.get(i), retrieved.get(i)); - } - } - scanner.close(); - assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); - assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); - } - - @Test - public void testRCFileBinarySerializeDeserialize() throws IOException { - if(storeType != StoreType.RCFILE) return; - - Schema schema = new Schema(); - schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.BIT); - schema.addColumn("col3", Type.CHAR, 7); - schema.addColumn("col4", Type.INT2); - schema.addColumn("col5", Type.INT4); - schema.addColumn("col6", Type.INT8); - schema.addColumn("col7", Type.FLOAT4); - schema.addColumn("col8", Type.FLOAT8); - schema.addColumn("col9", Type.TEXT); - schema.addColumn("col10", Type.BLOB); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", Type.NULL_TYPE); - schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); - - Path tablePath = new Path(testDir, "testVariousTypes.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - appender.init(); - - QueryId queryid = new QueryId("12345", 5); - ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - - Tuple tuple = new VTuple(13); - tuple.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), - DatumFactory.createChar("jinho"), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("jinho"), - DatumFactory.createBlob("hyunsik babo".getBytes()), - DatumFactory.createInet4("192.168.0.1"), - NullDatum.get(), - factory.createDatum(queryid.getProto()) - }); - appender.addTuple(tuple); - appender.flush(); - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); - - FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - Tuple retrieved; - while ((retrieved=scanner.next()) != null) { - for (int i = 0; i < tuple.size(); i++) { - assertEquals(tuple.get(i), retrieved.get(i)); - } - } - scanner.close(); - assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); - assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); - } - - @Test - public void testSequenceFileTextSerializeDeserialize() throws IOException { - if(storeType != StoreType.SEQUENCEFILE) return; - - Schema schema = new Schema(); - schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.BIT); - schema.addColumn("col3", Type.CHAR, 7); - schema.addColumn("col4", Type.INT2); - schema.addColumn("col5", Type.INT4); - schema.addColumn("col6", Type.INT8); - schema.addColumn("col7", Type.FLOAT4); - schema.addColumn("col8", Type.FLOAT8); - schema.addColumn("col9", Type.TEXT); - schema.addColumn("col10", Type.BLOB); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", Type.NULL_TYPE); - schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); - - Path tablePath = new Path(testDir, "testVariousTypes.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - appender.init(); - - QueryId queryid = new QueryId("12345", 5); - ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - - Tuple tuple = new VTuple(13); - tuple.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), - DatumFactory.createChar("jinho"), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("jinho"), - DatumFactory.createBlob("hyunsik babo".getBytes()), - DatumFactory.createInet4("192.168.0.1"), - NullDatum.get(), - factory.createDatum(queryid.getProto()) - }); - appender.addTuple(tuple); - appender.flush(); - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); - - FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - assertTrue(scanner instanceof SequenceFileScanner); - Writable key = ((SequenceFileScanner) scanner).getKey(); - assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); - - Tuple retrieved; - while ((retrieved=scanner.next()) != null) { - for (int i = 0; i < tuple.size(); i++) { - assertEquals(tuple.get(i), retrieved.get(i)); - } - } - scanner.close(); - assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); - assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); - } - - @Test - public void testSequenceFileBinarySerializeDeserialize() throws IOException { - if(storeType != StoreType.SEQUENCEFILE) return; - - Schema schema = new Schema(); - schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.BIT); - schema.addColumn("col3", Type.CHAR, 7); - schema.addColumn("col4", Type.INT2); - schema.addColumn("col5", Type.INT4); - schema.addColumn("col6", Type.INT8); - schema.addColumn("col7", Type.FLOAT4); - schema.addColumn("col8", Type.FLOAT8); - schema.addColumn("col9", Type.TEXT); - schema.addColumn("col10", Type.BLOB); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", Type.NULL_TYPE); - schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName()); - - Path tablePath = new Path(testDir, "testVariousTypes.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - appender.init(); - - QueryId queryid = new QueryId("12345", 5); - ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - - Tuple tuple = new VTuple(13); - tuple.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), - DatumFactory.createChar("jinho"), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("jinho"), - DatumFactory.createBlob("hyunsik babo".getBytes()), - DatumFactory.createInet4("192.168.0.1"), - NullDatum.get(), - factory.createDatum(queryid.getProto()) - }); - appender.addTuple(tuple); - appender.flush(); - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); - - FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - assertTrue(scanner instanceof SequenceFileScanner); - Writable key = ((SequenceFileScanner) scanner).getKey(); - assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName()); - - Tuple retrieved; - while ((retrieved=scanner.next()) != null) { - for (int i = 0; i < tuple.size(); i++) { - assertEquals(tuple.get(i), retrieved.get(i)); - } - } - scanner.close(); - assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); - assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); - } - - @Test - public void testTime() throws IOException { - if (storeType == StoreType.CSV || storeType == StoreType.RAW) { - Schema schema = new Schema(); - schema.addColumn("col1", Type.DATE); - schema.addColumn("col2", Type.TIME); - schema.addColumn("col3", Type.TIMESTAMP); - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - - Path tablePath = new Path(testDir, "testTime.data"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple = new VTuple(3); - tuple.put(new Datum[]{ - DatumFactory.createDate("1980-04-01"), - DatumFactory.createTime("12:34:56"), - DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)) - }); - appender.addTuple(tuple); - appender.flush(); - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - Tuple retrieved; - while ((retrieved = scanner.next()) != null) { - for (int i = 0; i < tuple.size(); i++) { - assertEquals(tuple.get(i), retrieved.get(i)); - } - } - scanner.close(); - } - } - - @Test - public void testSeekableScanner() throws IOException { - if (!seekable) { - return; - } - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - schema.addColumn("comment", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(storeType); - Path tablePath = new Path(testDir, "Seekable.data"); - FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema, - tablePath); - appender.enableStats(); - appender.init(); - int tupleNum = 100000; - VTuple vTuple; - - List<Long> offsets = Lists.newArrayList(); - offsets.add(0L); - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(3); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(25l)); - vTuple.put(2, DatumFactory.createText("test" + i)); - appender.addTuple(vTuple); - - // find a seek position - if (i % (tupleNum / 3) == 0) { - offsets.add(appender.getOffset()); - } - } - - // end of file - if (!offsets.contains(appender.getOffset())) { - offsets.add(appender.getOffset()); - } - - appender.close(); - if (statsable) { - TableStats stat = appender.getStats(); - assertEquals(tupleNum, stat.getNumRows().longValue()); - } - - FileStatus status = fs.getFileStatus(tablePath); - assertEquals(status.getLen(), appender.getOffset()); - - Scanner scanner; - int tupleCnt = 0; - long prevOffset = 0; - long readBytes = 0; - long readRows = 0; - for (long offset : offsets) { - scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, - new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); - scanner.init(); - - while (scanner.next() != null) { - tupleCnt++; - } - - scanner.close(); - if (statsable) { - readBytes += scanner.getInputStats().getNumBytes(); - readRows += scanner.getInputStats().getNumRows(); - } - prevOffset = offset; - } - - assertEquals(tupleNum, tupleCnt); - if (statsable) { - assertEquals(appender.getStats().getNumBytes().longValue(), readBytes); - assertEquals(appender.getStats().getNumRows().longValue(), readRows); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java deleted file mode 100644 index 639ca04..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class TestTupleComparator { - - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - - @Test - public final void testCompare() { - Schema schema = new Schema(); - schema.addColumn("col1", Type.INT4); - schema.addColumn("col2", Type.INT4); - schema.addColumn("col3", Type.INT4); - schema.addColumn("col4", Type.INT4); - schema.addColumn("col5", Type.TEXT); - - Tuple tuple1 = new VTuple(5); - Tuple tuple2 = new VTuple(5); - - tuple1.put( - new Datum[] { - DatumFactory.createInt4(9), - DatumFactory.createInt4(3), - DatumFactory.createInt4(33), - DatumFactory.createInt4(4), - DatumFactory.createText("abc")}); - tuple2.put( - new Datum[] { - DatumFactory.createInt4(1), - DatumFactory.createInt4(25), - DatumFactory.createInt4(109), - DatumFactory.createInt4(4), - DatumFactory.createText("abd")}); - - SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false); - SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false); - - BaseTupleComparator tc = new BaseTupleComparator(schema, - new SortSpec[] {sortKey1, sortKey2}); - assertEquals(-1, tc.compare(tuple1, tuple2)); - assertEquals(1, tc.compare(tuple2, tuple1)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java deleted file mode 100644 index 9837fd1..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - - -import org.junit.Before; -import org.junit.Test; -import org.apache.tajo.datum.DatumFactory; - -import static org.junit.Assert.*; - -public class TestVTuple { - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - - } - - @Test - public void testContain() { - VTuple t1 = new VTuple(260); - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(1)); - t1.put(27, DatumFactory.createInt4(1)); - t1.put(96, DatumFactory.createInt4(1)); - t1.put(257, DatumFactory.createInt4(1)); - - assertTrue(t1.contains(0)); - assertTrue(t1.contains(1)); - assertFalse(t1.contains(2)); - assertFalse(t1.contains(3)); - assertFalse(t1.contains(4)); - assertTrue(t1.contains(27)); - assertFalse(t1.contains(28)); - assertFalse(t1.contains(95)); - assertTrue(t1.contains(96)); - assertFalse(t1.contains(97)); - assertTrue(t1.contains(257)); - } - - @Test - public void testPut() { - VTuple t1 = new VTuple(260); - t1.put(0, DatumFactory.createText("str")); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(257, DatumFactory.createFloat4(0.76f)); - - assertTrue(t1.contains(0)); - assertTrue(t1.contains(1)); - - assertEquals(t1.getText(0),"str"); - assertEquals(t1.get(1).asInt4(),2); - assertTrue(t1.get(257).asFloat4() == 0.76f); - } - - @Test - public void testEquals() { - Tuple t1 = new VTuple(5); - Tuple t2 = new VTuple(5); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - - t2.put(0, DatumFactory.createInt4(1)); - t2.put(1, DatumFactory.createInt4(2)); - t2.put(3, DatumFactory.createInt4(2)); - - assertEquals(t1,t2); - - Tuple t3 = new VTuple(5); - t2.put(0, DatumFactory.createInt4(1)); - t2.put(1, DatumFactory.createInt4(2)); - t2.put(4, DatumFactory.createInt4(2)); - - assertNotSame(t1,t3); - } - - @Test - public void testHashCode() { - Tuple t1 = new VTuple(5); - Tuple t2 = new VTuple(5); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - t1.put(4, DatumFactory.createText("hyunsik")); - - t2.put(0, DatumFactory.createInt4(1)); - t2.put(1, DatumFactory.createInt4(2)); - t2.put(3, DatumFactory.createInt4(2)); - t2.put(4, DatumFactory.createText("hyunsik")); - - assertEquals(t1.hashCode(),t2.hashCode()); - - Tuple t3 = new VTuple(5); - t3.put(0, DatumFactory.createInt4(1)); - t3.put(1, DatumFactory.createInt4(2)); - t3.put(4, DatumFactory.createInt4(2)); - - assertNotSame(t1.hashCode(),t3.hashCode()); - } - - @Test - public void testPutTuple() { - Tuple t1 = new VTuple(5); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(2, DatumFactory.createInt4(3)); - - Tuple t2 = new VTuple(2); - t2.put(0, DatumFactory.createInt4(4)); - t2.put(1, DatumFactory.createInt4(5)); - - t1.put(3, t2); - - for (int i = 0; i < 5; i++) { - assertEquals(i+1, t1.get(i).asInt4()); - } - } - - @Test - public void testClone() throws CloneNotSupportedException { - Tuple t1 = new VTuple(5); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - t1.put(4, DatumFactory.createText("str")); - - VTuple t2 = (VTuple) t1.clone(); - assertNotSame(t1, t2); - assertEquals(t1, t2); - - assertSame(t1.get(4), t2.get(4)); - - t1.clear(); - assertFalse(t1.equals(t2)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java deleted file mode 100644 index a79e8ab..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.avro; - -import org.apache.avro.Schema; -import org.apache.tajo.HttpFileServer; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.NetUtils; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.net.URL; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}. - */ -public class TestAvroUtil { - private Schema expected; - private URL schemaUrl; - - @Before - public void setUp() throws Exception { - schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc"); - assertNotNull(schemaUrl); - - File file = new File(schemaUrl.getPath()); - assertTrue(file.exists()); - - expected = new Schema.Parser().parse(file); - } - - @Test - public void testGetSchema() throws IOException, URISyntaxException { - TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); - meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath()))); - Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf()); - assertEquals(expected, schema); - - meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); - meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath()); - schema = AvroUtil.getAvroSchema(meta, new TajoConf()); - assertEquals(expected, schema); - - HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); - try { - server.start(); - InetSocketAddress addr = server.getBindAddress(); - - String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath(); - meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); - meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url); - schema = AvroUtil.getAvroSchema(meta, new TajoConf()); - } finally { - server.stop(); - } - assertEquals(expected, schema); - } - - @Test - public void testGetSchemaFromHttp() throws IOException, URISyntaxException { - HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); - try { - server.start(); - InetSocketAddress addr = server.getBindAddress(); - - Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath()); - assertEquals(expected, schema); - } finally { - server.stop(); - } - } - - @Test - public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException { - Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf()); - - assertEquals(expected, schema); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java deleted file mode 100644 index 7900195..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ /dev/null @@ -1,946 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; -import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Random; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestBSTIndex { - private TajoConf conf; - private Schema schema; - private TableMeta meta; - - private static final int TUPLE_NUM = 10000; - private static final int LOAD_NUM = 100; - private static final String TEST_PATH = "target/test-data/TestIndex"; - private Path testDir; - private FileSystem fs; - private StoreType storeType; - - public TestBSTIndex(StoreType type) { - this.storeType = type; - conf = new TajoConf(); - conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH); - schema = new Schema(); - schema.addColumn(new Column("int", Type.INT4)); - schema.addColumn(new Column("long", Type.INT8)); - schema.addColumn(new Column("double", Type.FLOAT8)); - schema.addColumn(new Column("float", Type.FLOAT4)); - schema.addColumn(new Column("string", Type.TEXT)); - } - - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][]{ - {StoreType.CSV}, - {StoreType.RAW} - }); - } - - @Before - public void setUp() throws Exception { - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - } - - @Test - public void testFindValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindValue_" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, - keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - tuple = new VTuple(keySchema.size()); - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - for (int i = 0; i < TUPLE_NUM - 1; i++) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); - assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); - } - reader.close(); - scanner.close(); - } - - @Test - public void testBuildIndexWithAppender() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType); - FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema, - tablePath); - appender.init(); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - Tuple tuple; - long offset; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - - offset = appender.getOffset(); - appender.addTuple(tuple); - creater.write(tuple, offset); - } - appender.flush(); - appender.close(); - - creater.flush(); - creater.close(); - - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - tuple = new VTuple(keySchema.size()); - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - for (int i = 0; i < TUPLE_NUM - 1; i++) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8())); - assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); - } - reader.close(); - scanner.close(); - } - - @Test - public void testFindOmittedValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i += 2) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen()); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - for (int i = 1; i < TUPLE_NUM - 1; i += 2) { - keyTuple.put(0, DatumFactory.createInt8(i)); - keyTuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(keyTuple); - assertEquals(-1, offsets); - } - reader.close(); - } - - @Test - public void testFindNextKeyValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple result; - for (int i = 0; i < TUPLE_NUM - 1; i++) { - keyTuple = new VTuple(2); - keyTuple.put(0, DatumFactory.createInt4(i)); - keyTuple.put(1, DatumFactory.createInt8(i)); - long offsets = reader.find(keyTuple, true); - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", - (i + 1) == (result.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8())); - assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8())); - } - reader.close(); - scanner.close(); - } - - @Test - public void testFindNextKeyOmittedValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i += 2) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple result; - for (int i = 1; i < TUPLE_NUM - 1; i += 2) { - keyTuple = new VTuple(2); - keyTuple.put(0, DatumFactory.createInt4(i)); - keyTuple.put(1, DatumFactory.createInt8(i)); - long offsets = reader.find(keyTuple, true); - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8())); - } - scanner.close(); - } - - @Test - public void testFindMinValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindMinValue" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple; - for (int i = 5; i < TUPLE_NUM + 5; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - tuple = new VTuple(keySchema.size()); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - tuple.put(0, DatumFactory.createInt8(0)); - tuple.put(1, DatumFactory.createFloat8(0)); - - offset = reader.find(tuple); - assertEquals(-1, offset); - - offset = reader.find(tuple, true); - assertTrue(offset >= 0); - scanner.seek(offset); - tuple = scanner.next(); - assertEquals(5, tuple.get(1).asInt4()); - assertEquals(5l, tuple.get(2).asInt8()); - reader.close(); - scanner.close(); - } - - @Test - public void testMinMax() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testMinMax_" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 5; i < TUPLE_NUM; i += 2) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - - Tuple min = reader.getFirstKey(); - assertEquals(5, min.get(0).asInt4()); - assertEquals(5l, min.get(0).asInt8()); - - Tuple max = reader.getLastKey(); - assertEquals(TUPLE_NUM - 1, max.get(0).asInt4()); - assertEquals(TUPLE_NUM - 1, max.get(0).asInt8()); - reader.close(); - } - - private class ConcurrentAccessor implements Runnable { - final BSTIndexReader reader; - final Random rnd = new Random(System.currentTimeMillis()); - boolean failed = false; - - ConcurrentAccessor(BSTIndexReader reader) { - this.reader = reader; - } - - public boolean isFailed() { - return this.failed; - } - - @Override - public void run() { - Tuple findKey = new VTuple(2); - int keyVal; - for (int i = 0; i < 10000; i++) { - keyVal = rnd.nextInt(10000); - findKey.put(0, DatumFactory.createInt4(keyVal)); - findKey.put(1, DatumFactory.createInt8(keyVal)); - try { - assertTrue(reader.find(findKey) != -1); - } catch (Exception e) { - e.printStackTrace(); - this.failed = true; - } - } - } - } - - @Test - public void testConcurrentAccess() throws IOException, InterruptedException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - - Thread[] threads = new Thread[5]; - ConcurrentAccessor[] accs = new ConcurrentAccessor[5]; - for (int i = 0; i < threads.length; i++) { - accs[i] = new ConcurrentAccessor(reader); - threads[i] = new Thread(accs[i]); - threads[i].start(); - } - - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - assertFalse(accs[i].isFailed()); - } - reader.close(); - } - - - @Test - public void testFindValueDescOrder() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple; - for (int i = (TUPLE_NUM - 1); i >= 0; i--) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - tuple = new VTuple(keySchema.size()); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - for (int i = (TUPLE_NUM - 1); i > 0; i--) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); - assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8())); - } - reader.close(); - scanner.close(); - } - - @Test - public void testFindNextKeyValueDescOrder() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple; - for (int i = (TUPLE_NUM - 1); i >= 0; i--) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, - "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - - assertEquals(keySchema, reader.getKeySchema()); - assertEquals(comp, reader.getComparator()); - - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple result; - for (int i = (TUPLE_NUM - 1); i > 0; i--) { - keyTuple = new VTuple(2); - keyTuple.put(0, DatumFactory.createInt4(i)); - keyTuple.put(1, DatumFactory.createInt8(i)); - long offsets = reader.find(keyTuple, true); - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i - 1) + " ]", - (i - 1) == (result.get(0).asInt4())); - assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8())); - assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8())); - } - reader.close(); - scanner.close(); - } -}
