Repository: tajo Updated Branches: refs/heads/master 758927e5a -> 4561711f0
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java new file mode 100644 index 0000000..383740d --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -0,0 +1,947 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.index; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class TestBSTIndex { + private TajoConf conf; + private Schema schema; + private TableMeta meta; + + private static final int TUPLE_NUM = 10000; + private static final int LOAD_NUM = 100; + private static final String TEST_PATH = "target/test-data/TestIndex"; + private Path testDir; + private FileSystem fs; + private StoreType storeType; + + public TestBSTIndex(StoreType type) { + this.storeType = type; + conf = new TajoConf(); + conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH); + schema = new Schema(); + schema.addColumn(new Column("int", Type.INT4)); + schema.addColumn(new Column("long", Type.INT8)); + schema.addColumn(new Column("double", Type.FLOAT8)); + schema.addColumn(new Column("float", Type.FLOAT4)); + schema.addColumn(new Column("string", Type.TEXT)); + } + + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][]{ + {StoreType.CSV}, + {StoreType.RAW} + }); + } + + @Before + public void setUp() throws Exception { + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Test + public void testFindValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindValue_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, + keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + tuple = new VTuple(keySchema.size()); + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + for (int i = 0; i < TUPLE_NUM - 1; i++) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); + assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); + } + reader.close(); + scanner.close(); + } + + @Test + public void testBuildIndexWithAppender() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType); + FileAppender appender = (FileAppender) ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(meta, schema, tablePath); + appender.init(); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + Tuple tuple; + long offset; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + + offset = appender.getOffset(); + appender.addTuple(tuple); + creater.write(tuple, offset); + } + appender.flush(); + appender.close(); + + creater.flush(); + creater.close(); + + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + tuple = new VTuple(keySchema.size()); + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + for (int i = 0; i < TUPLE_NUM - 1; i++) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8())); + assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); + } + reader.close(); + scanner.close(); + } + + @Test + public void testFindOmittedValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i += 2) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen()); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + for (int i = 1; i < TUPLE_NUM - 1; i += 2) { + keyTuple.put(0, DatumFactory.createInt8(i)); + keyTuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(keyTuple); + assertEquals(-1, offsets); + } + reader.close(); + } + + @Test + public void testFindNextKeyValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple result; + for (int i = 0; i < TUPLE_NUM - 1; i++) { + keyTuple = new VTuple(2); + keyTuple.put(0, DatumFactory.createInt4(i)); + keyTuple.put(1, DatumFactory.createInt8(i)); + long offsets = reader.find(keyTuple, true); + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", + (i + 1) == (result.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8())); + assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8())); + } + reader.close(); + scanner.close(); + } + + @Test + public void testFindNextKeyOmittedValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i += 2) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple result; + for (int i = 1; i < TUPLE_NUM - 1; i += 2) { + keyTuple = new VTuple(2); + keyTuple.put(0, DatumFactory.createInt4(i)); + keyTuple.put(1, DatumFactory.createInt8(i)); + long offsets = reader.find(keyTuple, true); + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8())); + } + scanner.close(); + } + + @Test + public void testFindMinValue() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindMinValue" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple; + for (int i = 5; i < TUPLE_NUM + 5; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + tuple = new VTuple(keySchema.size()); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + tuple.put(0, DatumFactory.createInt8(0)); + tuple.put(1, DatumFactory.createFloat8(0)); + + offset = reader.find(tuple); + assertEquals(-1, offset); + + offset = reader.find(tuple, true); + assertTrue(offset >= 0); + scanner.seek(offset); + tuple = scanner.next(); + assertEquals(5, tuple.get(1).asInt4()); + assertEquals(5l, tuple.get(2).asInt8()); + reader.close(); + scanner.close(); + } + + @Test + public void testMinMax() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testMinMax_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 5; i < TUPLE_NUM; i += 2) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + + Tuple min = reader.getFirstKey(); + assertEquals(5, min.get(0).asInt4()); + assertEquals(5l, min.get(0).asInt8()); + + Tuple max = reader.getLastKey(); + assertEquals(TUPLE_NUM - 1, max.get(0).asInt4()); + assertEquals(TUPLE_NUM - 1, max.get(0).asInt8()); + reader.close(); + } + + private class ConcurrentAccessor implements Runnable { + final BSTIndexReader reader; + final Random rnd = new Random(System.currentTimeMillis()); + boolean failed = false; + + ConcurrentAccessor(BSTIndexReader reader) { + this.reader = reader; + } + + public boolean isFailed() { + return this.failed; + } + + @Override + public void run() { + Tuple findKey = new VTuple(2); + int keyVal; + for (int i = 0; i < 10000; i++) { + keyVal = rnd.nextInt(10000); + findKey.put(0, DatumFactory.createInt4(keyVal)); + findKey.put(1, DatumFactory.createInt8(keyVal)); + try { + assertTrue(reader.find(findKey) != -1); + } catch (Exception e) { + e.printStackTrace(); + this.failed = true; + } + } + } + } + + @Test + public void testConcurrentAccess() throws IOException, InterruptedException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + + Thread[] threads = new Thread[5]; + ConcurrentAccessor[] accs = new ConcurrentAccessor[5]; + for (int i = 0; i < threads.length; i++) { + accs[i] = new ConcurrentAccessor(reader); + threads[i] = new Thread(accs[i]); + threads[i].start(); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + assertFalse(accs[i].isFailed()); + } + reader.close(); + } + + + @Test + public void testFindValueDescOrder() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple; + for (int i = (TUPLE_NUM - 1); i >= 0; i--) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + tuple = new VTuple(keySchema.size()); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + for (int i = (TUPLE_NUM - 1); i > 0; i--) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); + assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4())); + assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8())); + } + reader.close(); + scanner.close(); + } + + @Test + public void testFindNextKeyValueDescOrder() throws IOException { + meta = CatalogUtil.newTableMeta(storeType); + + Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple; + for (int i = (TUPLE_NUM - 1); i >= 0; i--) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, + "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"), + keySchema, comp); + reader.open(); + + assertEquals(keySchema, reader.getKeySchema()); + assertEquals(comp, reader.getComparator()); + + scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); + scanner.init(); + + Tuple result; + for (int i = (TUPLE_NUM - 1); i > 0; i--) { + keyTuple = new VTuple(2); + keyTuple.put(0, DatumFactory.createInt4(i)); + keyTuple.put(1, DatumFactory.createInt8(i)); + long offsets = reader.find(keyTuple, true); + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i - 1) + " ]", + (i - 1) == (result.get(0).asInt4())); + assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + result = scanner.next(); + assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8())); + assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8())); + } + reader.close(); + scanner.close(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java new file mode 100644 index 0000000..d7c9f49 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.index; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.storage.CSVFile.CSVScanner; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSingleCSVFileBSTIndex { + + private TajoConf conf; + private Schema schema; + private TableMeta meta; + private FileSystem fs; + + private static final int TUPLE_NUM = 10000; + private static final int LOAD_NUM = 100; + private static final String TEST_PATH = "target/test-data/TestSingleCSVFileBSTIndex"; + private Path testDir; + + public TestSingleCSVFileBSTIndex() { + conf = new TajoConf(); + conf.setVar(ConfVars.ROOT_DIR, TEST_PATH); + schema = new Schema(); + schema.addColumn(new Column("int", Type.INT4)); + schema.addColumn(new Column("long", Type.INT8)); + schema.addColumn(new Column("double", Type.FLOAT8)); + schema.addColumn(new Column("float", Type.FLOAT4)); + schema.addColumn(new Column("string", Type.TEXT)); + } + + @Before + public void setUp() throws Exception { + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Test + public void testFindValueInSingleCSV() throws IOException { + meta = CatalogUtil.newTableMeta(StoreType.CSV); + + Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv"); + fs.mkdirs(tablePath.getParent()); + + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, + "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner.init(); + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = fileScanner.getNextOffset(); + tuple = fileScanner.next(); + if (tuple == null) + break; + + keyTuple.put(0, tuple.get(1)); + keyTuple.put(1, tuple.get(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + fileScanner.close(); + + tuple = new VTuple(keySchema.size()); + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, + "FindValueInCSV.idx"), keySchema, comp); + reader.open(); + fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner.init(); + for (int i = 0; i < TUPLE_NUM - 1; i++) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + fileScanner.seek(offsets); + tuple = fileScanner.next(); + assertEquals(i, (tuple.get(1).asInt8())); + assertEquals(i, (tuple.get(2).asFloat8()) , 0.01); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + fileScanner.seek(offsets); + tuple = fileScanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", + (i + 1) == (tuple.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]", + (i + 1) == (tuple.get(1).asInt8())); + } + } + + @Test + public void testFindNextKeyValueInSingleCSV() throws IOException { + meta = CatalogUtil.newTableMeta(StoreType.CSV); + + Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV", + "table1.csv"); + fs.mkdirs(tablePath.getParent()); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + for(int i = 0 ; i < TUPLE_NUM; i ++ ) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec [] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("int", Type.INT4)); + keySchema.addColumn(new Column("long", Type.INT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"), + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + creater.setLoadNum(LOAD_NUM); + creater.open(); + + SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner.init(); + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = fileScanner.getNextOffset(); + tuple = fileScanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.get(0)); + keyTuple.put(1, tuple.get(1)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + fileScanner.close(); + + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp); + reader.open(); + fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner.init(); + Tuple result; + for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) { + keyTuple = new VTuple(2); + keyTuple.put(0, DatumFactory.createInt4(i)); + keyTuple.put(1, DatumFactory.createInt8(i)); + long offsets = reader.find(keyTuple, true); + fileScanner.seek(offsets); + result = fileScanner.next(); + assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt4())); + assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8())); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + fileScanner.seek(offsets); + result = fileScanner.next(); + assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asInt8())); + assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asFloat8())); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java new file mode 100644 index 0000000..70282d9 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; + +import static org.junit.Assert.*; + +public class TestJsonSerDe { + private static Schema schema = new Schema(); + + static { + schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN); + schema.addColumn("col2", TajoDataTypes.Type.CHAR, 7); + schema.addColumn("col3", TajoDataTypes.Type.INT2); + schema.addColumn("col4", TajoDataTypes.Type.INT4); + schema.addColumn("col5", TajoDataTypes.Type.INT8); + schema.addColumn("col6", TajoDataTypes.Type.FLOAT4); + schema.addColumn("col7", TajoDataTypes.Type.FLOAT8); + schema.addColumn("col8", TajoDataTypes.Type.TEXT); + schema.addColumn("col9", TajoDataTypes.Type.BLOB); + schema.addColumn("col10", TajoDataTypes.Type.INET4); + schema.addColumn("col11", TajoDataTypes.Type.NULL_TYPE); + } + + public static Path getResourcePath(String path, String suffix) { + URL resultBaseURL = ClassLoader.getSystemResource(path); + return new Path(resultBaseURL.toString(), suffix); + } + + @Test + public void testVarioutType() throws IOException { + TajoConf conf = new TajoConf(); + + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); + Path tablePath = new Path(getResourcePath("dataset", "TestJsonSerDe"), "testVariousType.json"); + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple tuple = scanner.next(); + assertNotNull(tuple); + assertNull(scanner.next()); + scanner.close(); + + Tuple baseTuple = new VTuple(11); + baseTuple.put(new Datum[] { + DatumFactory.createBool(true), // 0 + DatumFactory.createChar("hyunsik"), // 1 + DatumFactory.createInt2((short) 17), // 2 + DatumFactory.createInt4(59), // 3 + DatumFactory.createInt8(23l), // 4 + DatumFactory.createFloat4(77.9f), // 5 + DatumFactory.createFloat8(271.9d), // 6 + DatumFactory.createText("hyunsik"), // 7 + DatumFactory.createBlob("hyunsik".getBytes()), // 8 + DatumFactory.createInet4("192.168.0.1"), // 9 + NullDatum.get(), // 10 + }); + + assertEquals(baseTuple, tuple); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java new file mode 100644 index 0000000..109fed9 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import com.google.common.base.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +public class TestReadWrite { + private static final String HELLO = "hello"; + + private Path createTmpFile() throws IOException { + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + + // it prevents accessing HDFS namenode of TajoTestingCluster. + LocalFileSystem localFS = LocalFileSystem.getLocal(new Configuration()); + return localFS.makeQualified(new Path(tmp.getPath())); + } + + private Schema createAllTypesSchema() { + List<Column> columns = new ArrayList<Column>(); + columns.add(new Column("myboolean", Type.BOOLEAN)); + columns.add(new Column("mybit", Type.BIT)); + columns.add(new Column("mychar", Type.CHAR)); + columns.add(new Column("myint2", Type.INT2)); + columns.add(new Column("myint4", Type.INT4)); + columns.add(new Column("myint8", Type.INT8)); + columns.add(new Column("myfloat4", Type.FLOAT4)); + columns.add(new Column("myfloat8", Type.FLOAT8)); + columns.add(new Column("mytext", Type.TEXT)); + columns.add(new Column("myblob", Type.BLOB)); + columns.add(new Column("mynull", Type.NULL_TYPE)); + Column[] columnsArray = new Column[columns.size()]; + columnsArray = columns.toArray(columnsArray); + return new Schema(columnsArray); + } + + @Test + public void testAll() throws Exception { + Path file = createTmpFile(); + Schema schema = createAllTypesSchema(); + Tuple tuple = new VTuple(schema.size()); + tuple.put(0, DatumFactory.createBool(true)); + tuple.put(1, DatumFactory.createBit((byte)128)); + tuple.put(2, DatumFactory.createChar('t')); + tuple.put(3, DatumFactory.createInt2((short)2048)); + tuple.put(4, DatumFactory.createInt4(4096)); + tuple.put(5, DatumFactory.createInt8(8192L)); + tuple.put(6, DatumFactory.createFloat4(0.2f)); + tuple.put(7, DatumFactory.createFloat8(4.1)); + tuple.put(8, DatumFactory.createText(HELLO)); + tuple.put(9, DatumFactory.createBlob(HELLO.getBytes(Charsets.UTF_8))); + tuple.put(10, NullDatum.get()); + + TajoParquetWriter writer = new TajoParquetWriter(file, schema); + writer.write(tuple); + writer.close(); + + TajoParquetReader reader = new TajoParquetReader(file, schema); + tuple = reader.read(); + + assertNotNull(tuple); + assertEquals(true, tuple.getBool(0)); + assertEquals((byte)128, tuple.getByte(1)); + assertTrue(String.valueOf('t').equals(String.valueOf(tuple.getChar(2)))); + assertEquals((short)2048, tuple.getInt2(3)); + assertEquals(4096, tuple.getInt4(4)); + assertEquals(8192L, tuple.getInt8(5)); + assertEquals(new Float(0.2f), new Float(tuple.getFloat4(6))); + assertEquals(new Double(4.1), new Double(tuple.getFloat8(7))); + assertTrue(HELLO.equals(tuple.getText(8))); + assertArrayEquals(HELLO.getBytes(Charsets.UTF_8), tuple.getBytes(9)); + assertEquals(NullDatum.get(), tuple.get(10)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java new file mode 100644 index 0000000..517e00e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.junit.Test; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TajoSchemaConverter}. + */ +public class TestSchemaConverter { + private static final String ALL_PARQUET_SCHEMA = + "message table_schema {\n" + + " optional boolean myboolean;\n" + + " optional int32 myint;\n" + + " optional int64 mylong;\n" + + " optional float myfloat;\n" + + " optional double mydouble;\n" + + " optional binary mybytes;\n" + + " optional binary mystring (UTF8);\n" + + " optional fixed_len_byte_array(1) myfixed;\n" + + "}\n"; + + private static final String CONVERTED_ALL_PARQUET_SCHEMA = + "message table_schema {\n" + + " optional boolean myboolean;\n" + + " optional int32 mybit;\n" + + " optional binary mychar (UTF8);\n" + + " optional int32 myint2;\n" + + " optional int32 myint4;\n" + + " optional int64 myint8;\n" + + " optional float myfloat4;\n" + + " optional double myfloat8;\n" + + " optional binary mytext (UTF8);\n" + + " optional binary myblob;\n" + + // NULL_TYPE fields are not encoded. + " optional binary myinet4;\n" + + " optional binary myprotobuf;\n" + + "}\n"; + + private Schema createAllTypesSchema() { + List<Column> columns = new ArrayList<Column>(); + columns.add(new Column("myboolean", Type.BOOLEAN)); + columns.add(new Column("mybit", Type.BIT)); + columns.add(new Column("mychar", Type.CHAR)); + columns.add(new Column("myint2", Type.INT2)); + columns.add(new Column("myint4", Type.INT4)); + columns.add(new Column("myint8", Type.INT8)); + columns.add(new Column("myfloat4", Type.FLOAT4)); + columns.add(new Column("myfloat8", Type.FLOAT8)); + columns.add(new Column("mytext", Type.TEXT)); + columns.add(new Column("myblob", Type.BLOB)); + columns.add(new Column("mynull", Type.NULL_TYPE)); + columns.add(new Column("myinet4", Type.INET4)); + columns.add(new Column("myprotobuf", Type.PROTOBUF)); + Column[] columnsArray = new Column[columns.size()]; + columnsArray = columns.toArray(columnsArray); + return new Schema(columnsArray); + } + + private Schema createAllTypesConvertedSchema() { + List<Column> columns = new ArrayList<Column>(); + columns.add(new Column("myboolean", Type.BOOLEAN)); + columns.add(new Column("myint", Type.INT4)); + columns.add(new Column("mylong", Type.INT8)); + columns.add(new Column("myfloat", Type.FLOAT4)); + columns.add(new Column("mydouble", Type.FLOAT8)); + columns.add(new Column("mybytes", Type.BLOB)); + columns.add(new Column("mystring", Type.TEXT)); + columns.add(new Column("myfixed", Type.BLOB)); + Column[] columnsArray = new Column[columns.size()]; + columnsArray = columns.toArray(columnsArray); + return new Schema(columnsArray); + } + + private void testTajoToParquetConversion( + Schema tajoSchema, String schemaString) throws Exception { + TajoSchemaConverter converter = new TajoSchemaConverter(); + MessageType schema = converter.convert(tajoSchema); + MessageType expected = MessageTypeParser.parseMessageType(schemaString); + assertEquals("converting " + schema + " to " + schemaString, + expected.toString(), schema.toString()); + } + + private void testParquetToTajoConversion( + Schema tajoSchema, String schemaString) throws Exception { + TajoSchemaConverter converter = new TajoSchemaConverter(); + Schema schema = converter.convert( + MessageTypeParser.parseMessageType(schemaString)); + assertEquals("converting " + schemaString + " to " + tajoSchema, + tajoSchema.toString(), schema.toString()); + } + + @Test + public void testAllTypesToParquet() throws Exception { + Schema schema = createAllTypesSchema(); + testTajoToParquetConversion(schema, CONVERTED_ALL_PARQUET_SCHEMA); + } + + @Test + public void testAllTypesToTajo() throws Exception { + Schema schema = createAllTypesConvertedSchema(); + testParquetToTajoConversion(schema, ALL_PARQUET_SCHEMA); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json new file mode 100644 index 0000000..739dfe7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json @@ -0,0 +1,6 @@ +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json new file mode 100644 index 0000000..8256b72 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json @@ -0,0 +1,4 @@ +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json new file mode 100644 index 0000000..8ee3408 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json @@ -0,0 +1 @@ +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt new file mode 100644 index 0000000..7403c26 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt @@ -0,0 +1,2 @@ +1|25|emiya muljomdao +2|25|emiya muljomdao \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc new file mode 100644 index 0000000..d4250a9 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc @@ -0,0 +1,20 @@ +{ + "type": "record", + "namespace": "org.apache.tajo", + "name": "testVariousTypes", + "fields": [ + { "name": "col1", "type": "boolean" }, + { "name": "col2", "type": "string" }, + { "name": "col3", "type": "int" }, + { "name": "col4", "type": "int" }, + { "name": "col5", "type": "long" }, + { "name": "col6", "type": "float" }, + { "name": "col7", "type": "double" }, + { "name": "col8", "type": "string" }, + { "name": "col9", "type": "bytes" }, + { "name": "col10", "type": "bytes" }, + { "name": "col11", "type": "null" }, + { "name": "col12", "type": "bytes" } + ] +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml new file mode 100644 index 0000000..737284b --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -0,0 +1,178 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + <property> + <name>fs.s3.impl</name> + <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value> + </property> + + <!-- Storage Manager Configuration --> + <property> + <name>tajo.storage.manager.hdfs.class</name> + <value>org.apache.tajo.storage.FileStorageManager</value> + </property> + <property> + <name>tajo.storage.manager.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value> + </property> + + <!--- Registered Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler</name> + <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value> + </property> + + <!--- Fragment Class Configurations --> + <property> + <name>tajo.storage.fragment.textfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.csv.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.json.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.raw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.rcfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.row.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.parquet.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.sequencefile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.avro.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + + <!--- Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.json.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroScanner</value> + </property> + + <!--- Appender Handler --> + <property> + <name>tajo.storage.appender-handler</name> + <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value> + </property> + + <property> + <name>tajo.storage.appender-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.json.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroAppender</value> + </property> +</configuration>
