http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java deleted file mode 100644 index d2cfd82..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ /dev/null @@ -1,946 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; -import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Random; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestBSTIndex { - private TajoConf conf; - private Schema schema; - private TableMeta meta; - - private static final int TUPLE_NUM = 10000; - private static final int LOAD_NUM = 100; - private static final String TEST_PATH = "target/test-data/TestIndex"; - private Path testDir; - private FileSystem fs; - private StoreType storeType; - - public TestBSTIndex(StoreType type) { - this.storeType = type; - conf = new TajoConf(); - conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH); - schema = new Schema(); - schema.addColumn(new Column("int", Type.INT4)); - schema.addColumn(new Column("long", Type.INT8)); - schema.addColumn(new Column("double", Type.FLOAT8)); - schema.addColumn(new Column("float", Type.FLOAT4)); - schema.addColumn(new Column("string", Type.TEXT)); - } - - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][]{ - {StoreType.CSV}, - {StoreType.RAW} - }); - } - - @Before - public void setUp() throws Exception { - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - } - - @Test - public void testFindValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindValue_" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, - keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - tuple = new VTuple(keySchema.size()); - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - for (int i = 0; i < TUPLE_NUM - 1; i++) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); - assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); - } - reader.close(); - scanner.close(); - } - - @Test - public void testBuildIndexWithAppender() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType); - FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(meta, schema, - tablePath); - appender.init(); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - Tuple tuple; - long offset; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - - offset = appender.getOffset(); - appender.addTuple(tuple); - creater.write(tuple, offset); - } - appender.flush(); - appender.close(); - - creater.flush(); - creater.close(); - - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - tuple = new VTuple(keySchema.size()); - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - for (int i = 0; i < TUPLE_NUM - 1; i++) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(1).asInt8())); - assertTrue("[seek check " + (i) + " ]", (i) == (tuple.get(2).asFloat8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.get(1).asInt8())); - } - reader.close(); - scanner.close(); - } - - @Test - public void testFindOmittedValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i += 2) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen()); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - for (int i = 1; i < TUPLE_NUM - 1; i += 2) { - keyTuple.put(0, DatumFactory.createInt8(i)); - keyTuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(keyTuple); - assertEquals(-1, offsets); - } - reader.close(); - } - - @Test - public void testFindNextKeyValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple result; - for (int i = 0; i < TUPLE_NUM - 1; i++) { - keyTuple = new VTuple(2); - keyTuple.put(0, DatumFactory.createInt4(i)); - keyTuple.put(1, DatumFactory.createInt8(i)); - long offsets = reader.find(keyTuple, true); - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", - (i + 1) == (result.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(0).asInt8())); - assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.get(1).asFloat8())); - } - reader.close(); - scanner.close(); - } - - @Test - public void testFindNextKeyOmittedValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i += 2) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple result; - for (int i = 1; i < TUPLE_NUM - 1; i += 2) { - keyTuple = new VTuple(2); - keyTuple.put(0, DatumFactory.createInt4(i)); - keyTuple.put(1, DatumFactory.createInt8(i)); - long offsets = reader.find(keyTuple, true); - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.get(1).asInt8())); - } - scanner.close(); - } - - @Test - public void testFindMinValue() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindMinValue" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple; - for (int i = 5; i < TUPLE_NUM + 5; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - tuple = new VTuple(keySchema.size()); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - tuple.put(0, DatumFactory.createInt8(0)); - tuple.put(1, DatumFactory.createFloat8(0)); - - offset = reader.find(tuple); - assertEquals(-1, offset); - - offset = reader.find(tuple, true); - assertTrue(offset >= 0); - scanner.seek(offset); - tuple = scanner.next(); - assertEquals(5, tuple.get(1).asInt4()); - assertEquals(5l, tuple.get(2).asInt8()); - reader.close(); - scanner.close(); - } - - @Test - public void testMinMax() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testMinMax_" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 5; i < TUPLE_NUM; i += 2) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - - Tuple min = reader.getFirstKey(); - assertEquals(5, min.get(0).asInt4()); - assertEquals(5l, min.get(0).asInt8()); - - Tuple max = reader.getLastKey(); - assertEquals(TUPLE_NUM - 1, max.get(0).asInt4()); - assertEquals(TUPLE_NUM - 1, max.get(0).asInt8()); - reader.close(); - } - - private class ConcurrentAccessor implements Runnable { - final BSTIndexReader reader; - final Random rnd = new Random(System.currentTimeMillis()); - boolean failed = false; - - ConcurrentAccessor(BSTIndexReader reader) { - this.reader = reader; - } - - public boolean isFailed() { - return this.failed; - } - - @Override - public void run() { - Tuple findKey = new VTuple(2); - int keyVal; - for (int i = 0; i < 10000; i++) { - keyVal = rnd.nextInt(10000); - findKey.put(0, DatumFactory.createInt4(keyVal)); - findKey.put(1, DatumFactory.createInt8(keyVal)); - try { - assertTrue(reader.find(findKey) != -1); - } catch (Exception e) { - e.printStackTrace(); - this.failed = true; - } - } - } - } - - @Test - public void testConcurrentAccess() throws IOException, InterruptedException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - - Thread[] threads = new Thread[5]; - ConcurrentAccessor[] accs = new ConcurrentAccessor[5]; - for (int i = 0; i < threads.length; i++) { - accs[i] = new ConcurrentAccessor(reader); - threads[i] = new Thread(accs[i]); - threads[i].start(); - } - - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - assertFalse(accs[i].isFailed()); - } - reader.close(); - } - - - @Test - public void testFindValueDescOrder() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple; - for (int i = (TUPLE_NUM - 1); i >= 0; i--) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - tuple = new VTuple(keySchema.size()); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - for (int i = (TUPLE_NUM - 1); i > 0; i--) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("seek check [" + (i) + " ," + (tuple.get(1).asInt8()) + "]", (i) == (tuple.get(1).asInt8())); - assertTrue("seek check [" + (i) + " ," + (tuple.get(2).asFloat8()) + "]", (i) == (tuple.get(2).asFloat8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.get(1).asInt8())); - } - reader.close(); - scanner.close(); - } - - @Test - public void testFindNextKeyValueDescOrder() throws IOException { - meta = CatalogUtil.newTableMeta(storeType); - - Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - - Tuple tuple; - for (int i = (TUPLE_NUM - 1); i >= 0; i--) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, - "testFindNextKeyValueDescOrder_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = scanner.getNextOffset(); - tuple = scanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - scanner.close(); - - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType + ".idx"), - keySchema, comp); - reader.open(); - - assertEquals(keySchema, reader.getKeySchema()); - assertEquals(comp, reader.getComparator()); - - scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); - scanner.init(); - - Tuple result; - for (int i = (TUPLE_NUM - 1); i > 0; i--) { - keyTuple = new VTuple(2); - keyTuple.put(0, DatumFactory.createInt4(i)); - keyTuple.put(1, DatumFactory.createInt8(i)); - long offsets = reader.find(keyTuple, true); - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i - 1) + " ]", - (i - 1) == (result.get(0).asInt4())); - assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.get(1).asInt8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - scanner.seek(offsets); - result = scanner.next(); - assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(0).asInt8())); - assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.get(1).asFloat8())); - } - reader.close(); - scanner.close(); - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java deleted file mode 100644 index 1081ae9..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; -import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; - -import static org.apache.tajo.storage.CSVFile.CSVScanner; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestSingleCSVFileBSTIndex { - - private TajoConf conf; - private Schema schema; - private TableMeta meta; - private FileSystem fs; - - private static final int TUPLE_NUM = 10000; - private static final int LOAD_NUM = 100; - private static final String TEST_PATH = "target/test-data/TestSingleCSVFileBSTIndex"; - private Path testDir; - - public TestSingleCSVFileBSTIndex() { - conf = new TajoConf(); - conf.setVar(ConfVars.ROOT_DIR, TEST_PATH); - schema = new Schema(); - schema.addColumn(new Column("int", Type.INT4)); - schema.addColumn(new Column("long", Type.INT8)); - schema.addColumn(new Column("double", Type.FLOAT8)); - schema.addColumn(new Column("float", Type.FLOAT4)); - schema.addColumn(new Column("string", Type.TEXT)); - } - - @Before - public void setUp() throws Exception { - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - } - - @Test - public void testFindValueInSingleCSV() throws IOException { - meta = CatalogUtil.newTableMeta(StoreType.CSV); - - Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv"); - fs.mkdirs(tablePath.getParent()); - - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for (int i = 0; i < TUPLE_NUM; i++) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec[] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("long", Type.INT8)); - keySchema.addColumn(new Column("double", Type.FLOAT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, - "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet); - fileScanner.init(); - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = fileScanner.getNextOffset(); - tuple = fileScanner.next(); - if (tuple == null) - break; - - keyTuple.put(0, tuple.get(1)); - keyTuple.put(1, tuple.get(2)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - fileScanner.close(); - - tuple = new VTuple(keySchema.size()); - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, - "FindValueInCSV.idx"), keySchema, comp); - reader.open(); - fileScanner = new CSVScanner(conf, schema, meta, tablet); - fileScanner.init(); - for (int i = 0; i < TUPLE_NUM - 1; i++) { - tuple.put(0, DatumFactory.createInt8(i)); - tuple.put(1, DatumFactory.createFloat8(i)); - long offsets = reader.find(tuple); - fileScanner.seek(offsets); - tuple = fileScanner.next(); - assertEquals(i, (tuple.get(1).asInt8())); - assertEquals(i, (tuple.get(2).asFloat8()) , 0.01); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - fileScanner.seek(offsets); - tuple = fileScanner.next(); - assertTrue("[seek check " + (i + 1) + " ]", - (i + 1) == (tuple.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]", - (i + 1) == (tuple.get(1).asInt8())); - } - } - - @Test - public void testFindNextKeyValueInSingleCSV() throws IOException { - meta = CatalogUtil.newTableMeta(StoreType.CSV); - - Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV", - "table1.csv"); - fs.mkdirs(tablePath.getParent()); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple; - for(int i = 0 ; i < TUPLE_NUM; i ++ ) { - tuple = new VTuple(5); - tuple.put(0, DatumFactory.createInt4(i)); - tuple.put(1, DatumFactory.createInt8(i)); - tuple.put(2, DatumFactory.createFloat8(i)); - tuple.put(3, DatumFactory.createFloat4(i)); - tuple.put(4, DatumFactory.createText("field_" + i)); - appender.addTuple(tuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); - - SortSpec [] sortKeys = new SortSpec[2]; - sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false); - sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false); - - Schema keySchema = new Schema(); - keySchema.addColumn(new Column("int", Type.INT4)); - keySchema.addColumn(new Column("long", Type.INT8)); - - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); - - BSTIndex bst = new BSTIndex(conf); - BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - creater.setLoadNum(LOAD_NUM); - creater.open(); - - SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet); - fileScanner.init(); - Tuple keyTuple; - long offset; - while (true) { - keyTuple = new VTuple(2); - offset = fileScanner.getNextOffset(); - tuple = fileScanner.next(); - if (tuple == null) break; - - keyTuple.put(0, tuple.get(0)); - keyTuple.put(1, tuple.get(1)); - creater.write(keyTuple, offset); - } - - creater.flush(); - creater.close(); - fileScanner.close(); - - BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp); - reader.open(); - fileScanner = new CSVScanner(conf, schema, meta, tablet); - fileScanner.init(); - Tuple result; - for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) { - keyTuple = new VTuple(2); - keyTuple.put(0, DatumFactory.createInt4(i)); - keyTuple.put(1, DatumFactory.createInt8(i)); - long offsets = reader.find(keyTuple, true); - fileScanner.seek(offsets); - result = fileScanner.next(); - assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(0).asInt4())); - assertTrue("[seek check " + (i + 1) + " ]" , (i + 1) == (result.get(1).asInt8())); - - offsets = reader.next(); - if (offsets == -1) { - continue; - } - fileScanner.seek(offsets); - result = fileScanner.next(); - assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(0).asInt8())); - assertTrue("[seek check " + (i + 2) + " ]" , (i + 2) == (result.get(1).asFloat8())); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java b/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java deleted file mode 100644 index 0a01dc4..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import com.google.common.base.Charsets; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNotNull; - -public class TestReadWrite { - private static final String HELLO = "hello"; - - private Path createTmpFile() throws IOException { - File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); - tmp.deleteOnExit(); - tmp.delete(); - - // it prevents accessing HDFS namenode of TajoTestingCluster. - LocalFileSystem localFS = LocalFileSystem.getLocal(new Configuration()); - return localFS.makeQualified(new Path(tmp.getPath())); - } - - private Schema createAllTypesSchema() { - List<Column> columns = new ArrayList<Column>(); - columns.add(new Column("myboolean", Type.BOOLEAN)); - columns.add(new Column("mybit", Type.BIT)); - columns.add(new Column("mychar", Type.CHAR)); - columns.add(new Column("myint2", Type.INT2)); - columns.add(new Column("myint4", Type.INT4)); - columns.add(new Column("myint8", Type.INT8)); - columns.add(new Column("myfloat4", Type.FLOAT4)); - columns.add(new Column("myfloat8", Type.FLOAT8)); - columns.add(new Column("mytext", Type.TEXT)); - columns.add(new Column("myblob", Type.BLOB)); - columns.add(new Column("mynull", Type.NULL_TYPE)); - Column[] columnsArray = new Column[columns.size()]; - columnsArray = columns.toArray(columnsArray); - return new Schema(columnsArray); - } - - @Test - public void testAll() throws Exception { - Path file = createTmpFile(); - Schema schema = createAllTypesSchema(); - Tuple tuple = new VTuple(schema.size()); - tuple.put(0, DatumFactory.createBool(true)); - tuple.put(1, DatumFactory.createBit((byte)128)); - tuple.put(2, DatumFactory.createChar('t')); - tuple.put(3, DatumFactory.createInt2((short)2048)); - tuple.put(4, DatumFactory.createInt4(4096)); - tuple.put(5, DatumFactory.createInt8(8192L)); - tuple.put(6, DatumFactory.createFloat4(0.2f)); - tuple.put(7, DatumFactory.createFloat8(4.1)); - tuple.put(8, DatumFactory.createText(HELLO)); - tuple.put(9, DatumFactory.createBlob(HELLO.getBytes(Charsets.UTF_8))); - tuple.put(10, NullDatum.get()); - - TajoParquetWriter writer = new TajoParquetWriter(file, schema); - writer.write(tuple); - writer.close(); - - TajoParquetReader reader = new TajoParquetReader(file, schema); - tuple = reader.read(); - - assertNotNull(tuple); - assertEquals(true, tuple.getBool(0)); - assertEquals((byte)128, tuple.getByte(1)); - assertTrue(String.valueOf('t').equals(String.valueOf(tuple.getChar(2)))); - assertEquals((short)2048, tuple.getInt2(3)); - assertEquals(4096, tuple.getInt4(4)); - assertEquals(8192L, tuple.getInt8(5)); - assertEquals(new Float(0.2f), new Float(tuple.getFloat4(6))); - assertEquals(new Double(4.1), new Double(tuple.getFloat8(7))); - assertTrue(HELLO.equals(tuple.getText(8))); - assertArrayEquals(HELLO.getBytes(Charsets.UTF_8), tuple.getBytes(9)); - assertEquals(NullDatum.get(), tuple.get(10)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java deleted file mode 100644 index 49a162b..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.common.TajoDataTypes.Type; - -import org.junit.Test; - -import parquet.schema.MessageType; -import parquet.schema.MessageTypeParser; - -import static org.junit.Assert.assertEquals; - -/** - * Tests for {@link TajoSchemaConverter}. - */ -public class TestSchemaConverter { - private static final String ALL_PARQUET_SCHEMA = - "message table_schema {\n" + - " optional boolean myboolean;\n" + - " optional int32 myint;\n" + - " optional int64 mylong;\n" + - " optional float myfloat;\n" + - " optional double mydouble;\n" + - " optional binary mybytes;\n" + - " optional binary mystring (UTF8);\n" + - " optional fixed_len_byte_array(1) myfixed;\n" + - "}\n"; - - private static final String CONVERTED_ALL_PARQUET_SCHEMA = - "message table_schema {\n" + - " optional boolean myboolean;\n" + - " optional int32 mybit;\n" + - " optional binary mychar (UTF8);\n" + - " optional int32 myint2;\n" + - " optional int32 myint4;\n" + - " optional int64 myint8;\n" + - " optional float myfloat4;\n" + - " optional double myfloat8;\n" + - " optional binary mytext (UTF8);\n" + - " optional binary myblob;\n" + - // NULL_TYPE fields are not encoded. - " optional binary myinet4;\n" + - " optional binary myprotobuf;\n" + - "}\n"; - - private Schema createAllTypesSchema() { - List<Column> columns = new ArrayList<Column>(); - columns.add(new Column("myboolean", Type.BOOLEAN)); - columns.add(new Column("mybit", Type.BIT)); - columns.add(new Column("mychar", Type.CHAR)); - columns.add(new Column("myint2", Type.INT2)); - columns.add(new Column("myint4", Type.INT4)); - columns.add(new Column("myint8", Type.INT8)); - columns.add(new Column("myfloat4", Type.FLOAT4)); - columns.add(new Column("myfloat8", Type.FLOAT8)); - columns.add(new Column("mytext", Type.TEXT)); - columns.add(new Column("myblob", Type.BLOB)); - columns.add(new Column("mynull", Type.NULL_TYPE)); - columns.add(new Column("myinet4", Type.INET4)); - columns.add(new Column("myprotobuf", Type.PROTOBUF)); - Column[] columnsArray = new Column[columns.size()]; - columnsArray = columns.toArray(columnsArray); - return new Schema(columnsArray); - } - - private Schema createAllTypesConvertedSchema() { - List<Column> columns = new ArrayList<Column>(); - columns.add(new Column("myboolean", Type.BOOLEAN)); - columns.add(new Column("myint", Type.INT4)); - columns.add(new Column("mylong", Type.INT8)); - columns.add(new Column("myfloat", Type.FLOAT4)); - columns.add(new Column("mydouble", Type.FLOAT8)); - columns.add(new Column("mybytes", Type.BLOB)); - columns.add(new Column("mystring", Type.TEXT)); - columns.add(new Column("myfixed", Type.BLOB)); - Column[] columnsArray = new Column[columns.size()]; - columnsArray = columns.toArray(columnsArray); - return new Schema(columnsArray); - } - - private void testTajoToParquetConversion( - Schema tajoSchema, String schemaString) throws Exception { - TajoSchemaConverter converter = new TajoSchemaConverter(); - MessageType schema = converter.convert(tajoSchema); - MessageType expected = MessageTypeParser.parseMessageType(schemaString); - assertEquals("converting " + schema + " to " + schemaString, - expected.toString(), schema.toString()); - } - - private void testParquetToTajoConversion( - Schema tajoSchema, String schemaString) throws Exception { - TajoSchemaConverter converter = new TajoSchemaConverter(); - Schema schema = converter.convert( - MessageTypeParser.parseMessageType(schemaString)); - assertEquals("converting " + schemaString + " to " + tajoSchema, - tajoSchema.toString(), schema.toString()); - } - - @Test - public void testAllTypesToParquet() throws Exception { - Schema schema = createAllTypesSchema(); - testTajoToParquetConversion(schema, CONVERTED_ALL_PARQUET_SCHEMA); - } - - @Test - public void testAllTypesToTajo() throws Exception { - Schema schema = createAllTypesConvertedSchema(); - testParquetToTajoConversion(schema, ALL_PARQUET_SCHEMA); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java deleted file mode 100644 index 7b09937..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.s3; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.s3.Block; -import org.apache.hadoop.io.IOUtils; - -import java.io.*; - -/** - * Holds file metadata including type (regular file, or directory), - * and the list of blocks that are pointers to the data. - */ [email protected] [email protected] -public class INode { - - enum FileType { - DIRECTORY, FILE - } - - public static final FileType[] FILE_TYPES = { - FileType.DIRECTORY, - FileType.FILE - }; - - public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null); - - private FileType fileType; - private Block[] blocks; - - public INode(FileType fileType, Block[] blocks) { - this.fileType = fileType; - if (isDirectory() && blocks != null) { - throw new IllegalArgumentException("A directory cannot contain blocks."); - } - this.blocks = blocks; - } - - public Block[] getBlocks() { - return blocks; - } - - public FileType getFileType() { - return fileType; - } - - public boolean isDirectory() { - return fileType == FileType.DIRECTORY; - } - - public boolean isFile() { - return fileType == FileType.FILE; - } - - public long getSerializedLength() { - return 1L + (blocks == null ? 0 : 4 + blocks.length * 16); - } - - - public InputStream serialize() throws IOException { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(bytes); - try { - out.writeByte(fileType.ordinal()); - if (isFile()) { - out.writeInt(blocks.length); - for (int i = 0; i < blocks.length; i++) { - out.writeLong(blocks[i].getId()); - out.writeLong(blocks[i].getLength()); - } - } - out.close(); - out = null; - } finally { - IOUtils.closeStream(out); - } - return new ByteArrayInputStream(bytes.toByteArray()); - } - - public static INode deserialize(InputStream in) throws IOException { - if (in == null) { - return null; - } - DataInputStream dataIn = new DataInputStream(in); - FileType fileType = INode.FILE_TYPES[dataIn.readByte()]; - switch (fileType) { - case DIRECTORY: - in.close(); - return INode.DIRECTORY_INODE; - case FILE: - int numBlocks = dataIn.readInt(); - Block[] blocks = new Block[numBlocks]; - for (int i = 0; i < numBlocks; i++) { - long id = dataIn.readLong(); - long length = dataIn.readLong(); - blocks[i] = new Block(id, length); - } - in.close(); - return new INode(fileType, blocks); - default: - throw new IllegalArgumentException("Cannot deserialize inode."); - } - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java deleted file mode 100644 index 40decc2..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.s3; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3.Block; -import org.apache.hadoop.fs.s3.FileSystemStore; -import org.apache.hadoop.fs.s3.INode; -import org.apache.tajo.common.exception.NotImplementedException; - -import java.io.*; -import java.net.URI; -import java.util.*; - -/** - * A stub implementation of {@link FileSystemStore} for testing - * {@link S3FileSystem} without actually connecting to S3. - */ -public class InMemoryFileSystemStore implements FileSystemStore { - - private Configuration conf; - private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>(); - private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>(); - - @Override - public void initialize(URI uri, Configuration conf) { - this.conf = conf; - } - - @Override - public String getVersion() throws IOException { - return "0"; - } - - @Override - public void deleteINode(Path path) throws IOException { - inodes.remove(normalize(path)); - } - - @Override - public void deleteBlock(Block block) throws IOException { - blocks.remove(block.getId()); - } - - @Override - public boolean inodeExists(Path path) throws IOException { - return inodes.containsKey(normalize(path)); - } - - @Override - public boolean blockExists(long blockId) throws IOException { - return blocks.containsKey(blockId); - } - - @Override - public INode retrieveINode(Path path) throws IOException { - return inodes.get(normalize(path)); - } - - @Override - public File retrieveBlock(Block block, long byteRangeStart) throws IOException { - byte[] data = blocks.get(block.getId()); - File file = createTempFile(); - BufferedOutputStream out = null; - try { - out = new BufferedOutputStream(new FileOutputStream(file)); - out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart); - } finally { - if (out != null) { - out.close(); - } - } - return file; - } - - private File createTempFile() throws IOException { - File dir = new File(conf.get("fs.s3.buffer.dir")); - if (!dir.exists() && !dir.mkdirs()) { - throw new IOException("Cannot create S3 buffer directory: " + dir); - } - File result = File.createTempFile("test-", ".tmp", dir); - result.deleteOnExit(); - return result; - } - - @Override - public Set<Path> listSubPaths(Path path) throws IOException { - Path normalizedPath = normalize(path); - // This is inefficient but more than adequate for testing purposes. - Set<Path> subPaths = new LinkedHashSet<Path>(); - for (Path p : inodes.tailMap(normalizedPath).keySet()) { - if (normalizedPath.equals(p.getParent())) { - subPaths.add(p); - } - } - return subPaths; - } - - @Override - public Set<Path> listDeepSubPaths(Path path) throws IOException { - Path normalizedPath = normalize(path); - String pathString = normalizedPath.toUri().getPath(); - if (!pathString.endsWith("/")) { - pathString += "/"; - } - // This is inefficient but more than adequate for testing purposes. - Set<Path> subPaths = new LinkedHashSet<Path>(); - for (Path p : inodes.tailMap(normalizedPath).keySet()) { - if (p.toUri().getPath().startsWith(pathString)) { - subPaths.add(p); - } - } - return subPaths; - } - - @Override - public void storeINode(Path path, INode inode) throws IOException { - inodes.put(normalize(path), inode); - } - - @Override - public void storeBlock(Block block, File file) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] buf = new byte[8192]; - int numRead; - BufferedInputStream in = null; - try { - in = new BufferedInputStream(new FileInputStream(file)); - while ((numRead = in.read(buf)) >= 0) { - out.write(buf, 0, numRead); - } - } finally { - if (in != null) { - in.close(); - } - } - blocks.put(block.getId(), out.toByteArray()); - } - - private Path normalize(Path path) { - if (!path.isAbsolute()) { - throw new IllegalArgumentException("Path must be absolute: " + path); - } - return new Path(path.toUri().getPath()); - } - - @Override - public void purge() throws IOException { - inodes.clear(); - blocks.clear(); - } - - @Override - public void dump() throws IOException { - throw new NotImplementedException(); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java deleted file mode 100644 index d4034b9..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.s3; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3.Block; -import org.apache.hadoop.fs.s3.FileSystemStore; -import org.apache.hadoop.fs.s3.INode; -import org.apache.hadoop.util.Progressable; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - [email protected] [email protected] -class S3OutputStream extends OutputStream { - - private Configuration conf; - - private int bufferSize; - - private FileSystemStore store; - - private Path path; - - private long blockSize; - - private File backupFile; - - private OutputStream backupStream; - - private Random r = new Random(); - - private boolean closed; - - private int pos = 0; - - private long filePos = 0; - - private int bytesWrittenToBlock = 0; - - private byte[] outBuf; - - private List<Block> blocks = new ArrayList<Block>(); - - private Block nextBlock; - - private static final Log LOG = - LogFactory.getLog(S3OutputStream.class.getName()); - - - public S3OutputStream(Configuration conf, FileSystemStore store, - Path path, long blockSize, Progressable progress, - int buffersize) throws IOException { - - this.conf = conf; - this.store = store; - this.path = path; - this.blockSize = blockSize; - this.backupFile = newBackupFile(); - this.backupStream = new FileOutputStream(backupFile); - this.bufferSize = buffersize; - this.outBuf = new byte[bufferSize]; - - } - - private File newBackupFile() throws IOException { - File dir = new File(conf.get("fs.s3.buffer.dir")); - if (!dir.exists() && !dir.mkdirs()) { - throw new IOException("Cannot create S3 buffer directory: " + dir); - } - File result = File.createTempFile("output-", ".tmp", dir); - result.deleteOnExit(); - return result; - } - - public long getPos() throws IOException { - return filePos; - } - - @Override - public synchronized void write(int b) throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } - - if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) { - flush(); - } - outBuf[pos++] = (byte) b; - filePos++; - } - - @Override - public synchronized void write(byte b[], int off, int len) throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } - while (len > 0) { - int remaining = bufferSize - pos; - int toWrite = Math.min(remaining, len); - System.arraycopy(b, off, outBuf, pos, toWrite); - pos += toWrite; - off += toWrite; - len -= toWrite; - filePos += toWrite; - - if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) { - flush(); - } - } - } - - @Override - public synchronized void flush() throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } - - if (bytesWrittenToBlock + pos >= blockSize) { - flushData((int) blockSize - bytesWrittenToBlock); - } - if (bytesWrittenToBlock == blockSize) { - endBlock(); - } - flushData(pos); - } - - private synchronized void flushData(int maxPos) throws IOException { - int workingPos = Math.min(pos, maxPos); - - if (workingPos > 0) { - // - // To the local block backup, write just the bytes - // - backupStream.write(outBuf, 0, workingPos); - - // - // Track position - // - bytesWrittenToBlock += workingPos; - System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); - pos -= workingPos; - } - } - - private synchronized void endBlock() throws IOException { - // - // Done with local copy - // - backupStream.close(); - - // - // Send it to S3 - // - // TODO: Use passed in Progressable to report progress. - nextBlockOutputStream(); - store.storeBlock(nextBlock, backupFile); - Block[] arr = new Block[blocks.size()]; - arr = blocks.toArray(arr); - store.storeINode(path, new INode(INode.FILE_TYPES[1], arr)); - - // - // Delete local backup, start new one - // - boolean b = backupFile.delete(); - if (!b) { - LOG.warn("Ignoring failed delete"); - } - backupFile = newBackupFile(); - backupStream = new FileOutputStream(backupFile); - bytesWrittenToBlock = 0; - } - - private synchronized void nextBlockOutputStream() throws IOException { - long blockId = r.nextLong(); - while (store.blockExists(blockId)) { - blockId = r.nextLong(); - } - nextBlock = new Block(blockId, bytesWrittenToBlock); - blocks.add(nextBlock); - bytesWrittenToBlock = 0; - } - - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - - flush(); - if (filePos == 0 || bytesWrittenToBlock != 0) { - endBlock(); - } - - backupStream.close(); - boolean b = backupFile.delete(); - if (!b) { - LOG.warn("Ignoring failed delete"); - } - - super.close(); - - closed = true; - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java deleted file mode 100644 index fc1c908..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.s3; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.s3.Block; -import org.apache.hadoop.fs.s3.FileSystemStore; -import org.apache.hadoop.fs.s3.INode; -import org.apache.hadoop.fs.s3.S3FileSystem; -import org.apache.hadoop.util.Progressable; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -public class SmallBlockS3FileSystem extends S3FileSystem { - - private URI uri; - - private FileSystemStore store; - - private Path workingDir; - - static class Holder { - private static InMemoryFileSystemStore s; - - public synchronized static FileSystemStore get() { - if(s != null) { - return s; - } - s = new InMemoryFileSystemStore(); - return s; - } - - public synchronized static void set(InMemoryFileSystemStore inMemoryFileSystemStore) { - s = inMemoryFileSystemStore; - } - } - - public SmallBlockS3FileSystem() { - } - - - public SmallBlockS3FileSystem( - InMemoryFileSystemStore inMemoryFileSystemStore) { - Holder.set(inMemoryFileSystemStore); - this.store = inMemoryFileSystemStore; - } - - @Override - public URI getUri() { - return uri; - } - @Override - public long getDefaultBlockSize() { - return 10; - } - - @Override - public void initialize(URI uri, Configuration conf) throws IOException { - if (store == null) { - store = Holder.get(); - } - store.initialize(uri, conf); - setConf(conf); - this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - this.workingDir = - new Path("/user", System.getProperty("user.name")).makeQualified(this); - } - @Override - public boolean isFile(Path path) throws IOException { - INode inode = store.retrieveINode(makeAbsolute(path)); - if (inode == null) { - return false; - } - return inode.isFile(); - } - - private INode checkFile(Path path) throws IOException { - INode inode = store.retrieveINode(makeAbsolute(path)); - if (inode == null) { - throw new IOException("No such file."); - } - if (inode.isDirectory()) { - throw new IOException("Path " + path + " is a directory."); - } - return inode; - } - - @Override - public FileStatus[] listStatus(Path f) throws IOException { - Path absolutePath = makeAbsolute(f); - INode inode = store.retrieveINode(absolutePath); - if (inode == null) { - throw new FileNotFoundException("File " + f + " does not exist."); - } - if (inode.isFile()) { - return new FileStatus[] { - new S3FileStatus(f.makeQualified(this), inode) - }; - } - ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); - for (Path p : store.listSubPaths(absolutePath)) { - ret.add(getFileStatus(p.makeQualified(this))); - } - return ret.toArray(new FileStatus[0]); - } - @Override - public FSDataOutputStream create(Path file, FsPermission permission, - boolean overwrite, int bufferSize, - short replication, long blockSize, Progressable progress) - throws IOException { - - INode inode = store.retrieveINode(makeAbsolute(file)); - if (inode != null) { - if (overwrite) { - delete(file, true); - } else { - throw new IOException("File already exists: " + file); - } - } else { - Path parent = file.getParent(); - if (parent != null) { - if (!mkdirs(parent)) { - throw new IOException("Mkdirs failed to create " + parent.toString()); - } - } - } - return new FSDataOutputStream - (new S3OutputStream(getConf(), store, makeAbsolute(file), - blockSize, progress, bufferSize), - statistics); - } - @Override - public boolean mkdirs(Path path, FsPermission permission) throws IOException { - Path absolutePath = makeAbsolute(path); - List<Path> paths = new ArrayList<Path>(); - do { - paths.add(0, absolutePath); - absolutePath = absolutePath.getParent(); - } while (absolutePath != null); - - boolean result = true; - for (Path p : paths) { - result &= mkdir(p); - } - return result; - } - - @Override - public Path getWorkingDirectory() { - return workingDir; - } - - @Override - public boolean rename(Path src, Path dst) throws IOException { - Path absoluteSrc = makeAbsolute(src); - INode srcINode = store.retrieveINode(absoluteSrc); - if (srcINode == null) { - // src path doesn't exist - return false; - } - Path absoluteDst = makeAbsolute(dst); - INode dstINode = store.retrieveINode(absoluteDst); - if (dstINode != null && dstINode.isDirectory()) { - absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); - dstINode = store.retrieveINode(absoluteDst); - } - if (dstINode != null) { - // dst path already exists - can't overwrite - return false; - } - Path dstParent = absoluteDst.getParent(); - if (dstParent != null) { - INode dstParentINode = store.retrieveINode(dstParent); - if (dstParentINode == null || dstParentINode.isFile()) { - // dst parent doesn't exist or is a file - return false; - } - } - return renameRecursive(absoluteSrc, absoluteDst); - } - - private boolean renameRecursive(Path src, Path dst) throws IOException { - INode srcINode = store.retrieveINode(src); - store.storeINode(dst, srcINode); - store.deleteINode(src); - if (srcINode.isDirectory()) { - for (Path oldSrc : store.listDeepSubPaths(src)) { - INode inode = store.retrieveINode(oldSrc); - if (inode == null) { - return false; - } - String oldSrcPath = oldSrc.toUri().getPath(); - String srcPath = src.toUri().getPath(); - String dstPath = dst.toUri().getPath(); - Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); - store.storeINode(newDst, inode); - store.deleteINode(oldSrc); - } - } - return true; - } - - @Override - public boolean delete(Path path, boolean recursive) throws IOException { - Path absolutePath = makeAbsolute(path); - INode inode = store.retrieveINode(absolutePath); - if (inode == null) { - return false; - } - if (inode.isFile()) { - store.deleteINode(absolutePath); - for (Block block: inode.getBlocks()) { - store.deleteBlock(block); - } - } else { - FileStatus[] contents = null; - try { - contents = listStatus(absolutePath); - } catch(FileNotFoundException fnfe) { - return false; - } - - if ((contents.length !=0) && (!recursive)) { - throw new IOException("Directory " + path.toString() - + " is not empty."); - } - for (FileStatus p:contents) { - if (!delete(p.getPath(), recursive)) { - return false; - } - } - store.deleteINode(absolutePath); - } - return true; - } - - /** - * FileStatus for S3 file systems. - */ - @Override - public FileStatus getFileStatus(Path f) throws IOException { - INode inode = store.retrieveINode(makeAbsolute(f)); - if (inode == null) { - throw new FileNotFoundException(f + ": No such file or directory."); - } - return new S3FileStatus(f.makeQualified(this), inode); - } - private boolean mkdir(Path path) throws IOException { - Path absolutePath = makeAbsolute(path); - INode inode = store.retrieveINode(absolutePath); - if (inode == null) { - store.storeINode(absolutePath, INode.DIRECTORY_INODE); - } else if (inode.isFile()) { - throw new IOException(String.format( - "Can't make directory for path %s since it is a file.", - absolutePath)); - } - return true; - } - private Path makeAbsolute(Path path) { - if (path.isAbsolute()) { - return path; - } - return new Path(workingDir, path); - } - - private static class S3FileStatus extends FileStatus { - - S3FileStatus(Path f, INode inode) throws IOException { - super(findLength(inode), inode.isDirectory(), 1, - findBlocksize(inode), 0, f); - } - - private static long findLength(INode inode) { - if (!inode.isDirectory()) { - long length = 0L; - for (Block block : inode.getBlocks()) { - length += block.getLength(); - } - return length; - } - return 0; - } - - private static long findBlocksize(INode inode) { - final Block[] ret = inode.getBlocks(); - return ret == null ? 0L : ret[0].getLength(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java deleted file mode 100644 index b332364..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple; - -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.tuple.offheap.*; -import org.junit.Test; - -public class TestBaseTupleBuilder { - - @Test - public void testBuild() { - BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); - - OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248); - OffHeapRowBlockReader reader = rowBlock.getReader(); - - ZeroCopyTuple inputTuple = new ZeroCopyTuple(); - - HeapTuple heapTuple = null; - ZeroCopyTuple zcTuple = null; - int i = 0; - while(reader.next(inputTuple)) { - RowStoreUtil.convert(inputTuple, builder); - - heapTuple = builder.buildToHeapTuple(); - TestOffHeapRowBlock.validateTupleResult(i, heapTuple); - - zcTuple = builder.buildToZeroCopyTuple(); - TestOffHeapRowBlock.validateTupleResult(i, zcTuple); - - i++; - } - } - - @Test - public void testBuildWithNull() { - BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); - - OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248); - OffHeapRowBlockReader reader = rowBlock.getReader(); - - ZeroCopyTuple inputTuple = new ZeroCopyTuple(); - - HeapTuple heapTuple = null; - ZeroCopyTuple zcTuple = null; - int i = 0; - while(reader.next(inputTuple)) { - RowStoreUtil.convert(inputTuple, builder); - - heapTuple = builder.buildToHeapTuple(); - TestOffHeapRowBlock.validateNullity(i, heapTuple); - - zcTuple = builder.buildToZeroCopyTuple(); - TestOffHeapRowBlock.validateNullity(i, zcTuple); - - i++; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java deleted file mode 100644 index 96f465a..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java +++ /dev/null @@ -1,45 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.catalog.SchemaUtil; -import org.junit.Test; - -public class TestHeapTuple { - - @Test - public void testHeapTuple() { - OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - ZeroCopyTuple zcTuple = new ZeroCopyTuple(); - int i = 0; - while (reader.next(zcTuple)) { - byte [] bytes = new byte[zcTuple.nioBuffer().limit()]; - zcTuple.nioBuffer().get(bytes); - - HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema)); - TestOffHeapRowBlock.validateTupleResult(i, heapTuple); - i++; - } - - rowBlock.release(); - } -} \ No newline at end of file
