http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java new file mode 100644 index 0000000..4081a80 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -0,0 +1,867 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.rcfile.RCFile; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestStorages { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestStorages"; + + private static String TEST_PROJECTION_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testProjection\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"int\" },\n" + + " { \"name\": \"age\", \"type\": \"long\" },\n" + + " { \"name\": \"score\", \"type\": \"float\" }\n" + + " ]\n" + + "}\n"; + + private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testNullHandlingTypes\",\n" + + " \"fields\": [\n" + + " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + + " { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" + + " { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" + + " { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" + + " { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col12\", \"type\": \"null\" },\n" + + " { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" + + " ]\n" + + "}\n"; + + private StoreType storeType; + private boolean splitable; + private boolean statsable; + private boolean seekable; + private Path testDir; + private FileSystem fs; + + public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException { + this.storeType = type; + this.splitable = splitable; + this.statsable = statsable; + this.seekable = seekable; + + conf = new TajoConf(); + + if (storeType == StoreType.RCFILE) { + conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100); + } + + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + //type, splitable, statsable, seekable + {StoreType.CSV, true, true, true}, + {StoreType.RAW, false, true, true}, + {StoreType.RCFILE, true, true, false}, + {StoreType.PARQUET, false, false, false}, + {StoreType.SEQUENCEFILE, true, true, false}, + {StoreType.AVRO, false, false, false}, + {StoreType.TEXTFILE, true, true, false}, + }); + } + + @Test + public void testSplitable() throws IOException { + if (splitable) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = (long) (Math.random() * fileLen) + 1; + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + scanner = sm.getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + } + + @Test + public void testRCFileSplitable() throws IOException { + if (storeType == StoreType.RCFILE) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = 122; // header size + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + scanner = sm.getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + } + + @Test + public void testProjection() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("score", Type.FLOAT4); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_PROJECTION_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testProjection.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(i + 2)); + vTuple.put(2, DatumFactory.createFloat4(i + 3)); + appender.addTuple(vTuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen()); + + Schema target = new Schema(); + target.addColumn("age", Type.INT8); + target.addColumn("score", Type.FLOAT4); + Scanner scanner = sm.getScanner(meta, schema, fragment, target); + scanner.init(); + int tupleCnt = 0; + Tuple tuple; + while ((tuple = scanner.next()) != null) { + if (storeType == StoreType.RCFILE + || storeType == StoreType.CSV + || storeType == StoreType.PARQUET + || storeType == StoreType.SEQUENCEFILE + || storeType == StoreType.AVRO) { + assertTrue(tuple.get(0) == null); + } + assertTrue(tupleCnt + 2 == tuple.get(1).asInt8()); + assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4()); + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + + @Test + public void testVariousTypes() throws IOException { + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString(); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); + } + + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("hyunsik"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = sm.getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved = scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + } + + @Test + public void testNullHandlingTypes() throws IOException { + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + meta.putOption(StorageConstants.TEXT_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); + meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\"); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple seedTuple = new VTuple(13); + seedTuple.put(new Datum[]{ + DatumFactory.createBool(true), // 0 + DatumFactory.createBit((byte) 0x99), // 1 + DatumFactory.createChar("hyunsik"), // 2 + DatumFactory.createInt2((short) 17), // 3 + DatumFactory.createInt4(59), // 4 + DatumFactory.createInt8(23l), // 5 + DatumFactory.createFloat4(77.9f), // 6 + DatumFactory.createFloat8(271.9f), // 7 + DatumFactory.createText("hyunsik"), // 8 + DatumFactory.createBlob("hyunsik".getBytes()),// 9 + DatumFactory.createInet4("192.168.0.1"), // 10 + NullDatum.get(), // 11 + factory.createDatum(queryid.getProto()) // 12 + }); + + // Making tuples with different null column positions + Tuple tuple; + for (int i = 0; i < 13; i++) { + tuple = new VTuple(13); + for (int j = 0; j < 13; j++) { + if (i == j) { // i'th column will have NULL value + tuple.put(j, NullDatum.get()); + } else { + tuple.put(j, seedTuple.get(j)); + } + } + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + int i = 0; + while ((retrieved = scanner.next()) != null) { + assertEquals(13, retrieved.size()); + for (int j = 0; j < 13; j++) { + if (i == j) { + assertEquals(NullDatum.get(), retrieved.get(j)); + } else { + assertEquals(seedTuple.get(j), retrieved.get(j)); + } + } + + i++; + } + scanner.close(); + } + + @Test + public void testRCFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testRCFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testSequenceFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testSequenceFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName()); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testTime() throws IOException { + if (storeType == StoreType.CSV || storeType == StoreType.RAW) { + Schema schema = new Schema(); + schema.addColumn("col1", Type.DATE); + schema.addColumn("col2", Type.TIME); + schema.addColumn("col3", Type.TIMESTAMP); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + + Path tablePath = new Path(testDir, "testTime.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple = new VTuple(3); + tuple.put(new Datum[]{ + DatumFactory.createDate("1980-04-01"), + DatumFactory.createTime("12:34:56"), + DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved = scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + } + } + + @Test + public void testSeekableScanner() throws IOException { + if (!seekable) { + return; + } + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Seekable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 100000; + VTuple vTuple; + + List<Long> offsets = Lists.newArrayList(); + offsets.add(0L); + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("test" + i)); + appender.addTuple(vTuple); + + // find a seek position + if (i % (tupleNum / 3) == 0) { + offsets.add(appender.getOffset()); + } + } + + // end of file + if (!offsets.contains(appender.getOffset())) { + offsets.add(appender.getOffset()); + } + + appender.close(); + if (statsable) { + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + } + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(status.getLen(), appender.getOffset()); + + Scanner scanner; + int tupleCnt = 0; + long prevOffset = 0; + long readBytes = 0; + long readRows = 0; + for (long offset : offsets) { + scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, + new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); + scanner.init(); + + while (scanner.next() != null) { + tupleCnt++; + } + + scanner.close(); + if (statsable) { + readBytes += scanner.getInputStats().getNumBytes(); + readRows += scanner.getInputStats().getNumRows(); + } + prevOffset = offset; + } + + assertEquals(tupleNum, tupleCnt); + if (statsable) { + assertEquals(appender.getStats().getNumBytes().longValue(), readBytes); + assertEquals(appender.getStats().getNumRows().longValue(), readRows); + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java new file mode 100644 index 0000000..4f7ea1c --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.avro; + +import org.apache.avro.Schema; +import org.apache.tajo.HttpFileServer; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.NetUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.net.URL; + +import static org.junit.Assert.*; + +/** + * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}. + */ +public class TestAvroUtil { + private Schema expected; + private URL schemaUrl; + + @Before + public void setUp() throws Exception { + schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc"); + assertNotNull(schemaUrl); + + File file = new File(schemaUrl.getPath()); + assertTrue(file.exists()); + + expected = new Schema.Parser().parse(file); + } + + @Test + public void testGetSchema() throws IOException, URISyntaxException { + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath()))); + Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + assertEquals(expected, schema); + + meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath()); + schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + assertEquals(expected, schema); + + HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); + try { + server.start(); + InetSocketAddress addr = server.getBindAddress(); + + String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath(); + meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url); + schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + } finally { + server.stop(); + } + assertEquals(expected, schema); + } + + @Test + public void testGetSchemaFromHttp() throws IOException, URISyntaxException { + HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); + try { + server.start(); + InetSocketAddress addr = server.getBindAddress(); + + Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath()); + assertEquals(expected, schema); + } finally { + server.stop(); + } + } + + @Test + public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException { + Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf()); + + assertEquals(expected, schema); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java new file mode 100644 index 0000000..383740d --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -0,0 +1,947 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.index; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class TestBSTIndex { + private TajoConf conf; + private Schema schema; + private TableMeta meta; + + private static final int TUPLE_NUM = 10000; + private static final int LOAD_NUM = 100; + private static final String TEST_PATH = "target/test-data/TestIndex"; + private Path testDir; + private FileSystem fs; + private StoreType storeType; + + public TestBSTIndex(StoreType type) { + this.storeType = type; + conf = new TajoConf(); + conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH); + schema = new Schema(); + schema.addColumn(new Column("int", Type.INT4)); + schema.addColumn(new Column("long", Type.INT8)); + schema.addColumn(new Column("double", Type.FLOAT8)); + schema.addColumn(new Column("float", Type.FLOAT4)); + schema.addColumn(new Column("string", Type.TEXT)); + } + + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][]{ + {StoreType.CSV}, + {StoreType.RAW} + }); + } + + @Before + public void setUp() throws Exception { + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Test + public void testFindValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindValue_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, + keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + tuple = new VTuple(keySchema.size()); + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + for (int i = 0; i < TUPLE_NUM - 1; i++) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); + assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); + } + reader.close(); + scanner.close(); + } + + @Test + public void testBuildIndexWithAppender() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType); + FileAppender appender = (FileAppender) ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(meta, schema, tablePath); + appender.init(); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + Tuple tuple; + long offset; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + + offset = appender.getOffset(); + appender.addTuple(tuple); + creater.write(tuple, offset); + } + appender.flush(); + appender.close(); + + creater.flush(); + creater.close(); + + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + tuple = new VTuple(keySchema.size()); + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + for (int i = 0; i < TUPLE_NUM - 1; i++) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8())); + assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); + } + reader.close(); + scanner.close(); + } + + @Test + public void testFindOmittedValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i += 2) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen()); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + for (int i = 1; i < TUPLE_NUM - 1; i += 2) { + keyTuple.put(0, DatumFactory.createInt8(i)); + keyTuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(keyTuple); + assertEquals(-1, offsets); + } + reader.close(); + } + + @Test + public void testFindNextKeyValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple result; + for (int i = 0; i < TUPLE_NUM - 1; i++) { + keyTuple = new VTuple(2); + keyTuple.put(0, DatumFactory.createInt4(i)); + keyTuple.put(1, DatumFactory.createInt8(i)); + long offsets = reader.find(keyTuple, true); + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", + (i + 1) == (result.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8())); + assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8())); + } + reader.close(); + scanner.close(); + } + + @Test + public void testFindNextKeyOmittedValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i += 2) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple result; + for (int i = 1; i < TUPLE_NUM - 1; i += 2) { + keyTuple = new VTuple(2); + keyTuple.put(0, DatumFactory.createInt4(i)); + keyTuple.put(1, DatumFactory.createInt8(i)); + long offsets = reader.find(keyTuple, true); + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8())); + } + scanner.close(); + } + + @Test + public void testFindMinValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindMinValue" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple; + for (int i = 5; i < TUPLE_NUM + 5; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + tuple = new VTuple(keySchema.size()); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + tuple.put(0, DatumFactory.createInt8(0)); + tuple.put(1, DatumFactory.createFloat8(0)); + + offset = reader.find(tuple); + assertEquals(-1, offset); + + offset = reader.find(tuple, true); + assertTrue(offset >= 0); + scanner.seek(offset); + tuple = scanner.next(); + assertEquals(5, tuple.get(1).asInt4()); + assertEquals(5l, tuple.get(2).asInt8()); + reader.close(); + scanner.close(); + } + + @Test + public void testMinMax() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testMinMax_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 5; i < TUPLE_NUM; i += 2) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + + Tuple min = reader.getFirstKey(); + assertEquals(5, min.get(0).asInt4()); + assertEquals(5l, min.get(0).asInt8()); + + Tuple max = reader.getLastKey(); + assertEquals(TUPLE_NUM - 1, max.get(0).asInt4()); + assertEquals(TUPLE_NUM - 1, max.get(0).asInt8()); + reader.close(); + } + + private class ConcurrentAccessor implements Runnable { + final BSTIndexReader reader; + final Random rnd = new Random(System.currentTimeMillis()); + boolean failed = false; + + ConcurrentAccessor(BSTIndexReader reader) { + this.reader = reader; + } + + public boolean isFailed() { + return this.failed; + } + + @Override + public void run() { + Tuple findKey = new VTuple(2); + int keyVal; + for (int i = 0; i < 10000; i++) { + keyVal = rnd.nextInt(10000); + findKey.put(0, DatumFactory.createInt4(keyVal)); + findKey.put(1, DatumFactory.createInt8(keyVal)); + try { + assertTrue(reader.find(findKey) != -1); + } catch (Exception e) { + e.printStackTrace(); + this.failed = true; + } + } + } + } + + @Test + public void testConcurrentAccess() throws IOException, InterruptedException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + + Thread[] threads = new Thread[5]; + ConcurrentAccessor[] accs = new ConcurrentAccessor[5]; + for (int i = 0; i < threads.length; i++) { + accs[i] = new ConcurrentAccessor(reader); + threads[i] = new Thread(accs[i]); + threads[i].start(); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + assertFalse(accs[i].isFailed()); + } + reader.close(); + } + + + @Test + public void testFindValueDescOrder() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple; + for (int i = (TUPLE_NUM - 1); i >= 0; i--) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + tuple = new VTuple(keySchema.size()); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + for (int i = (TUPLE_NUM - 1); i > 0; i--) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); + assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4())); + assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8())); + } + reader.close(); + scanner.close(); + } + + @Test + public void testFindNextKeyValueDescOrder() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple; + for (int i = (TUPLE_NUM - 1); i >= 0; i--) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, + "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + + assertEquals(keySchema, reader.getKeySchema()); + assertEquals(comp, reader.getComparator()); + + scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple result; + for (int i = (TUPLE_NUM - 1); i > 0; i--) { + keyTuple = new VTuple(2); + keyTuple.put(0, DatumFactory.createInt4(i)); + keyTuple.put(1, DatumFactory.createInt8(i)); + long offsets = reader.find(keyTuple, true); + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i - 1) + " ]", + (i - 1) == (result.get(0).asInt4())); + assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8())); + assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8())); + } + reader.close(); + scanner.close(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java new file mode 100644 index 0000000..d7c9f49 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.index; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.storage.CSVFile.CSVScanner; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSingleCSVFileBSTIndex { + + private TajoConf conf; + private Schema schema; + private TableMeta meta; + private FileSystem fs; + + private static final int TUPLE_NUM = 10000; + private static final int LOAD_NUM = 100; + private static final String TEST_PATH = "target/test-data/TestSingleCSVFileBSTIndex"; + private Path testDir; + + public TestSingleCSVFileBSTIndex() { + conf = new TajoConf(); + conf.setVar(ConfVars.ROOT_DIR, TEST_PATH); + schema = new Schema(); + schema.addColumn(new Column("int", Type.INT4)); + schema.addColumn(new Column("long", Type.INT8)); + schema.addColumn(new Column("double", Type.FLOAT8)); + schema.addColumn(new Column("float", Type.FLOAT4)); + schema.addColumn(new Column("string", Type.TEXT)); + } + + @Before + public void setUp() throws Exception { + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Test + public void testFindValueInSingleCSV() throws IOException { + meta = CatalogUtil.newTableMeta(StoreType.CSV); + + Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv"); + fs.mkdirs(tablePath.getParent()); + + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, + "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner.init(); + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = fileScanner.getNextOffset(); + tuple = fileScanner.next(); + if (tuple == null) + break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + fileScanner.close(); + + tuple = new VTuple(keySchema.size()); + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, + "FindValueInCSV.idx"), keySchema, comp); + reader.open(); + fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner.init(); + for (int i = 0; i < TUPLE_NUM - 1; i++) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + fileScanner.seek(offsets); + tuple = fileScanner.next(); + assertEquals(i, (tuple.get(1).asInt8())); + assertEquals(i, (tuple.get(2).asFloat8()) , 0.01); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + fileScanner.seek(offsets); + tuple = fileScanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", + (i + 1) == (tuple.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", + (i + 1) == (tuple.get(1).asInt8())); + } + } + + @Test + public void testFindNextKeyValueInSingleCSV() throws IOException { + meta = CatalogUtil.newTableMeta(StoreType.CSV); + + Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV", + "table1.csv"); + fs.mkdirs(tablePath.getParent()); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for(int i = 0 ; i < TUPLE_NUM; i ++ ) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec [] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner.init(); + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = fileScanner.getNextOffset(); + tuple = fileScanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + fileScanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp); + reader.open(); + fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner.init(); + Tuple result; + for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) { + keyTuple = new VTuple(2); + keyTuple.put(0, DatumFactory.createInt4(i)); + keyTuple.put(1, DatumFactory.createInt8(i)); + long offsets = reader.find(keyTuple, true); + fileScanner.seek(offsets); + result = fileScanner.next(); + assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + fileScanner.seek(offsets); + result = fileScanner.next(); + assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asInt8())); + assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asFloat8())); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java new file mode 100644 index 0000000..109fed9 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import com.google.common.base.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +public class TestReadWrite { + private static final String HELLO = "hello"; + + private Path createTmpFile() throws IOException { + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + + // it prevents accessing HDFS namenode of TajoTestingCluster. + LocalFileSystem localFS = LocalFileSystem.getLocal(new Configuration()); + return localFS.makeQualified(new Path(tmp.getPath())); + } + + private Schema createAllTypesSchema() { + List<Column> columns = new ArrayList<Column>(); + columns.add(new Column("myboolean", Type.BOOLEAN)); + columns.add(new Column("mybit", Type.BIT)); + columns.add(new Column("mychar", Type.CHAR)); + columns.add(new Column("myint2", Type.INT2)); + columns.add(new Column("myint4", Type.INT4)); + columns.add(new Column("myint8", Type.INT8)); + columns.add(new Column("myfloat4", Type.FLOAT4)); + columns.add(new Column("myfloat8", Type.FLOAT8)); + columns.add(new Column("mytext", Type.TEXT)); + columns.add(new Column("myblob", Type.BLOB)); + columns.add(new Column("mynull", Type.NULL_TYPE)); + Column[] columnsArray = new Column[columns.size()]; + columnsArray = columns.toArray(columnsArray); + return new Schema(columnsArray); + } + + @Test + public void testAll() throws Exception { + Path file = createTmpFile(); + Schema schema = createAllTypesSchema(); + Tuple tuple = new VTuple(schema.size()); + tuple.put(0, DatumFactory.createBool(true)); + tuple.put(1, DatumFactory.createBit((byte)128)); + tuple.put(2, DatumFactory.createChar('t')); + tuple.put(3, DatumFactory.createInt2((short)2048)); + tuple.put(4, DatumFactory.createInt4(4096)); + tuple.put(5, DatumFactory.createInt8(8192L)); + tuple.put(6, DatumFactory.createFloat4(0.2f)); + tuple.put(7, DatumFactory.createFloat8(4.1)); + tuple.put(8, DatumFactory.createText(HELLO)); + tuple.put(9, DatumFactory.createBlob(HELLO.getBytes(Charsets.UTF_8))); + tuple.put(10, NullDatum.get()); + + TajoParquetWriter writer = new TajoParquetWriter(file, schema); + writer.write(tuple); + writer.close(); + + TajoParquetReader reader = new TajoParquetReader(file, schema); + tuple = reader.read(); + + assertNotNull(tuple); + assertEquals(true, tuple.getBool(0)); + assertEquals((byte)128, tuple.getByte(1)); + assertTrue(String.valueOf('t').equals(String.valueOf(tuple.getChar(2)))); + assertEquals((short)2048, tuple.getInt2(3)); + assertEquals(4096, tuple.getInt4(4)); + assertEquals(8192L, tuple.getInt8(5)); + assertEquals(new Float(0.2f), new Float(tuple.getFloat4(6))); + assertEquals(new Double(4.1), new Double(tuple.getFloat8(7))); + assertTrue(HELLO.equals(tuple.getText(8))); + assertArrayEquals(HELLO.getBytes(Charsets.UTF_8), tuple.getBytes(9)); + assertEquals(NullDatum.get(), tuple.get(10)); + } +}
