http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java new file mode 100644 index 0000000..07e8dd7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.text.ByteBufLineReader; +import org.apache.tajo.storage.text.DelimitedLineReader; +import org.apache.tajo.storage.text.DelimitedTextFile; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public class TestLineReader { + private static String TEST_PATH = "target/test-data/TestLineReader"; + + @Test + public void testByteBufLineReader() throws IOException { + TajoConf conf = new TajoConf(); + Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); + FileSystem fs = testDir.getFileSystem(conf); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + schema.addColumn("comment2", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); + Path tablePath = new Path(testDir, "line.data"); + FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( + null, null, meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(4); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("emiya muljomdao")); + vTuple.put(3, NullDatum.get()); + appender.addTuple(vTuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + + ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath)); + ByteBufLineReader reader = new ByteBufLineReader(channel); + + long totalRead = 0; + int i = 0; + AtomicInteger bytes = new AtomicInteger(); + for(;;){ + ByteBuf buf = reader.readLineBuf(bytes); + totalRead += bytes.get(); + if(buf == null) break; + i++; + } + IOUtils.cleanup(null, reader, channel, fs); + assertEquals(tupleNum, i); + assertEquals(status.getLen(), totalRead); + assertEquals(status.getLen(), reader.readBytes()); + } + + @Test + public void testLineDelimitedReader() throws IOException { + TajoConf conf = new TajoConf(); + Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); + FileSystem fs = testDir.getFileSystem(conf); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + schema.addColumn("comment2", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); + meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); + + Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName()); + FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( + null, null, meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + long splitOffset = 0; + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(4); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("emiya muljomdao")); + vTuple.put(3, NullDatum.get()); + appender.addTuple(vTuple); + + if(i == (tupleNum / 2)){ + splitOffset = appender.getOffset(); + } + } + String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); + appender.close(); + + tablePath = tablePath.suffix(extension); + FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset); + DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF + assertTrue(reader.isCompressed()); + assertFalse(reader.isReadable()); + reader.init(); + assertTrue(reader.isReadable()); + + + int i = 0; + while(reader.isReadable()){ + ByteBuf buf = reader.readLine(); + if(buf == null) break; + i++; + } + + IOUtils.cleanup(null, reader, fs); + assertEquals(tupleNum, i); + + } + + @Test + public void testByteBufLineReaderWithoutTerminating() throws IOException { + String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile(); + File file = new File(path); + String data = FileUtil.readTextFile(file); + + ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file)); + ByteBufLineReader reader = new ByteBufLineReader(channel); + + long totalRead = 0; + int i = 0; + AtomicInteger bytes = new AtomicInteger(); + for(;;){ + ByteBuf buf = reader.readLineBuf(bytes); + totalRead += bytes.get(); + if(buf == null) break; + i++; + } + IOUtils.cleanup(null, reader); + assertEquals(file.length(), totalRead); + assertEquals(file.length(), reader.readBytes()); + assertEquals(data.split("\n").length, i); + } + + @Test + public void testCRLFLine() throws IOException { + TajoConf conf = new TajoConf(); + Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt"); + + FileSystem fs = testFile.getFileSystem(conf); + FSDataOutputStream outputStream = fs.create(testFile, true); + outputStream.write("0\r\n1\r\n".getBytes()); + outputStream.flush(); + IOUtils.closeStream(outputStream); + + ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile)); + ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2)); + FileStatus status = fs.getFileStatus(testFile); + + long totalRead = 0; + int i = 0; + AtomicInteger bytes = new AtomicInteger(); + for(;;){ + ByteBuf buf = reader.readLineBuf(bytes); + totalRead += bytes.get(); + if(buf == null) break; + String row = buf.toString(Charset.defaultCharset()); + assertEquals(i, Integer.parseInt(row)); + i++; + } + IOUtils.cleanup(null, reader); + assertEquals(status.getLen(), totalRead); + assertEquals(status.getLen(), reader.readBytes()); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java new file mode 100644 index 0000000..a0daa7d --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class TestMergeScanner { + private TajoConf conf; + StorageManager sm; + private static String TEST_PATH = "target/test-data/TestMergeScanner"; + + private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testMultipleFiles\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"int\" },\n" + + " { \"name\": \"file\", \"type\": \"string\" },\n" + + " { \"name\": \"name\", \"type\": \"string\" },\n" + + " { \"name\": \"age\", \"type\": \"long\" }\n" + + " ]\n" + + "}\n"; + + private Path testDir; + private StoreType storeType; + private FileSystem fs; + + public TestMergeScanner(StoreType storeType) { + this.storeType = storeType; + } + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + {StoreType.CSV}, + {StoreType.RAW}, + {StoreType.RCFILE}, + {StoreType.PARQUET}, + {StoreType.SEQUENCEFILE}, + {StoreType.AVRO}, + // RowFile requires Byte-buffer read support, so we omitted RowFile. + //{StoreType.ROWFILE}, + }); + } + + @Before + public void setup() throws Exception { + conf = new TajoConf(); + conf.setVar(ConfVars.ROOT_DIR, TEST_PATH); + conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro"); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + sm = StorageManager.getFileStorageManager(conf, testDir); + } + + @Test + public void testMultipleFiles() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("file", Type.TEXT); + schema.addColumn("name", Type.TEXT); + schema.addColumn("age", Type.INT8); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_MULTIPLE_FILES_AVRO_SCHEMA); + } + + Path table1Path = new Path(testDir, storeType + "_1.data"); + Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path); + appender1.enableStats(); + appender1.init(); + int tupleNum = 10000; + VTuple vTuple; + + for(int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(4); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createText("hyunsik")); + vTuple.put(2, DatumFactory.createText("jihoon")); + vTuple.put(3, DatumFactory.createInt8(25l)); + appender1.addTuple(vTuple); + } + appender1.close(); + + TableStats stat1 = appender1.getStats(); + if (stat1 != null) { + assertEquals(tupleNum, stat1.getNumRows().longValue()); + } + + Path table2Path = new Path(testDir, storeType + "_2.data"); + Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path); + appender2.enableStats(); + appender2.init(); + + for(int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(4); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createText("hyunsik")); + vTuple.put(2, DatumFactory.createText("jihoon")); + vTuple.put(3, DatumFactory.createInt8(25l)); + appender2.addTuple(vTuple); + } + appender2.close(); + + TableStats stat2 = appender2.getStats(); + if (stat2 != null) { + assertEquals(tupleNum, stat2.getNumRows().longValue()); + } + + + FileStatus status1 = fs.getFileStatus(table1Path); + FileStatus status2 = fs.getFileStatus(table2Path); + Fragment[] fragment = new Fragment[2]; + fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen()); + fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen()); + + Schema targetSchema = new Schema(); + targetSchema.addColumn(schema.getColumn(0)); + targetSchema.addColumn(schema.getColumn(2)); + + Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema); + assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable()); + + scanner.init(); + int totalCounts = 0; + Tuple tuple; + while ((tuple = scanner.next()) != null) { + totalCounts++; + if (isProjectableStorage(meta.getStoreType())) { + assertNotNull(tuple.get(0)); + assertNull(tuple.get(1)); + assertNotNull(tuple.get(2)); + assertNull(tuple.get(3)); + } + } + scanner.close(); + + assertEquals(tupleNum * 2, totalCounts); + } + + private static boolean isProjectableStorage(StoreType type) { + switch (type) { + case RCFILE: + case PARQUET: + case SEQUENCEFILE: + case CSV: + case AVRO: + return true; + default: + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java new file mode 100644 index 0000000..12ea551 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; +import org.apache.tajo.storage.text.FieldSplitProcessor; +import org.apache.tajo.storage.text.LineSplitProcessor; +import org.junit.Test; + +import java.io.IOException; + +import static io.netty.util.ReferenceCountUtil.releaseLater; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSplitProcessor { + + @Test + public void testFieldSplitProcessor() throws IOException { + String data = "abc||de"; + final ByteBuf buf = releaseLater( + Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); + + final int len = buf.readableBytes(); + FieldSplitProcessor processor = new FieldSplitProcessor('|'); + + assertEquals(3, buf.forEachByte(0, len, processor)); + assertEquals(4, buf.forEachByte(4, len - 4, processor)); + assertEquals(-1, buf.forEachByte(5, len - 5, processor)); + + } + + @Test + public void testLineSplitProcessor() throws IOException { + String data = "abc\r\n\n"; + final ByteBuf buf = releaseLater( + Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); + + final int len = buf.readableBytes(); + LineSplitProcessor processor = new LineSplitProcessor(); + + //find CR + assertEquals(3, buf.forEachByte(0, len, processor)); + + // find CRLF + assertEquals(4, buf.forEachByte(4, len - 4, processor)); + assertEquals(buf.getByte(4), '\n'); + // need to skip LF + assertTrue(processor.isPrevCharCR()); + + // find LF + assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java new file mode 100644 index 0000000..15998f2 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -0,0 +1,878 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.rcfile.RCFile; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestStorages { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestStorages"; + + private static String TEST_PROJECTION_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testProjection\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"int\" },\n" + + " { \"name\": \"age\", \"type\": \"long\" },\n" + + " { \"name\": \"score\", \"type\": \"float\" }\n" + + " ]\n" + + "}\n"; + + private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testNullHandlingTypes\",\n" + + " \"fields\": [\n" + + " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + + " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" + + " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" + + " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" + + " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col11\", \"type\": \"null\" },\n" + + " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" + + " ]\n" + + "}\n"; + + private StoreType storeType; + private boolean splitable; + private boolean statsable; + private boolean seekable; + private Path testDir; + private FileSystem fs; + + public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException { + this.storeType = type; + this.splitable = splitable; + this.statsable = statsable; + this.seekable = seekable; + + conf = new TajoConf(); + + if (storeType == StoreType.RCFILE) { + conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100); + } + + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + //type, splitable, statsable, seekable + {StoreType.CSV, true, true, true}, + {StoreType.RAW, false, true, true}, + {StoreType.RCFILE, true, true, false}, + {StoreType.PARQUET, false, false, false}, + {StoreType.SEQUENCEFILE, true, true, false}, + {StoreType.AVRO, false, false, false}, + {StoreType.TEXTFILE, true, true, false}, + {StoreType.JSON, true, true, false}, + }); + } + + @Test + public void testSplitable() throws IOException { + if (splitable) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = (long) (Math.random() * fileLen) + 1; + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + scanner = sm.getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + } + + @Test + public void testRCFileSplitable() throws IOException { + if (storeType == StoreType.RCFILE) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = 122; // header size + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + scanner = sm.getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + } + + @Test + public void testProjection() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("score", Type.FLOAT4); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_PROJECTION_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testProjection.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(i + 2)); + vTuple.put(2, DatumFactory.createFloat4(i + 3)); + appender.addTuple(vTuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen()); + + Schema target = new Schema(); + target.addColumn("age", Type.INT8); + target.addColumn("score", Type.FLOAT4); + Scanner scanner = sm.getScanner(meta, schema, fragment, target); + scanner.init(); + int tupleCnt = 0; + Tuple tuple; + while ((tuple = scanner.next()) != null) { + if (storeType == StoreType.RCFILE + || storeType == StoreType.CSV + || storeType == StoreType.PARQUET + || storeType == StoreType.SEQUENCEFILE + || storeType == StoreType.AVRO) { + assertTrue(tuple.get(0) == null); + } + assertTrue(tupleCnt + 2 == tuple.get(1).asInt8()); + assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4()); + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + + @Test + public void testVariousTypes() throws IOException { + boolean handleProtobuf = storeType != StoreType.JSON; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + if (handleProtobuf) { + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + } + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString(); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); + } + + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0)); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("hyunsik"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get() + }); + if (handleProtobuf) { + tuple.put(11, factory.createDatum(queryid.getProto())); + } + + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = sm.getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved = scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + } + + @Test + public void testNullHandlingTypes() throws IOException { + boolean handleProtobuf = storeType != StoreType.JSON; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + + if (handleProtobuf) { + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + } + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + meta.putOption(StorageConstants.TEXT_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); + meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\"); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + int columnNum = 11 + (handleProtobuf ? 1 : 0); + Tuple seedTuple = new VTuple(columnNum); + seedTuple.put(new Datum[]{ + DatumFactory.createBool(true), // 0 + DatumFactory.createChar("hyunsik"), // 2 + DatumFactory.createInt2((short) 17), // 3 + DatumFactory.createInt4(59), // 4 + DatumFactory.createInt8(23l), // 5 + DatumFactory.createFloat4(77.9f), // 6 + DatumFactory.createFloat8(271.9f), // 7 + DatumFactory.createText("hyunsik"), // 8 + DatumFactory.createBlob("hyunsik".getBytes()),// 9 + DatumFactory.createInet4("192.168.0.1"), // 10 + NullDatum.get(), // 11 + }); + + if (handleProtobuf) { + seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12 + } + + // Making tuples with different null column positions + Tuple tuple; + for (int i = 0; i < columnNum; i++) { + tuple = new VTuple(columnNum); + for (int j = 0; j < columnNum; j++) { + if (i == j) { // i'th column will have NULL value + tuple.put(j, NullDatum.get()); + } else { + tuple.put(j, seedTuple.get(j)); + } + } + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + int i = 0; + while ((retrieved = scanner.next()) != null) { + assertEquals(columnNum, retrieved.size()); + for (int j = 0; j < columnNum; j++) { + if (i == j) { + assertEquals(NullDatum.get(), retrieved.get(j)); + } else { + assertEquals(seedTuple.get(j), retrieved.get(j)); + } + } + + i++; + } + scanner.close(); + } + + @Test + public void testRCFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testRCFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testSequenceFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testSequenceFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName()); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testTime() throws IOException { + if (storeType == StoreType.CSV || storeType == StoreType.RAW) { + Schema schema = new Schema(); + schema.addColumn("col1", Type.DATE); + schema.addColumn("col2", Type.TIME); + schema.addColumn("col3", Type.TIMESTAMP); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + + Path tablePath = new Path(testDir, "testTime.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple = new VTuple(3); + tuple.put(new Datum[]{ + DatumFactory.createDate("1980-04-01"), + DatumFactory.createTime("12:34:56"), + DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved = scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + } + } + + @Test + public void testSeekableScanner() throws IOException { + if (!seekable) { + return; + } + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Seekable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 100000; + VTuple vTuple; + + List<Long> offsets = Lists.newArrayList(); + offsets.add(0L); + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("test" + i)); + appender.addTuple(vTuple); + + // find a seek position + if (i % (tupleNum / 3) == 0) { + offsets.add(appender.getOffset()); + } + } + + // end of file + if (!offsets.contains(appender.getOffset())) { + offsets.add(appender.getOffset()); + } + + appender.close(); + if (statsable) { + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + } + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(status.getLen(), appender.getOffset()); + + Scanner scanner; + int tupleCnt = 0; + long prevOffset = 0; + long readBytes = 0; + long readRows = 0; + for (long offset : offsets) { + scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, + new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); + scanner.init(); + + while (scanner.next() != null) { + tupleCnt++; + } + + scanner.close(); + if (statsable) { + readBytes += scanner.getInputStats().getNumBytes(); + readRows += scanner.getInputStats().getNumRows(); + } + prevOffset = offset; + } + + assertEquals(tupleNum, tupleCnt); + if (statsable) { + assertEquals(appender.getStats().getNumBytes().longValue(), readBytes); + assertEquals(appender.getStats().getNumRows().longValue(), readRows); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java new file mode 100644 index 0000000..7b83894 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.avro; + +import org.apache.avro.Schema; +import org.apache.tajo.HttpFileServer; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.NetUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.net.URL; + +import static org.junit.Assert.*; + +/** + * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}. + */ +public class TestAvroUtil { + private Schema expected; + private URL schemaUrl; + + @Before + public void setUp() throws Exception { + schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc"); + assertNotNull(schemaUrl); + + File file = new File(schemaUrl.getPath()); + assertTrue(file.exists()); + + expected = new Schema.Parser().parse(file); + } + + @Test + public void testGetSchema() throws IOException, URISyntaxException { + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath()))); + Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + assertEquals(expected, schema); + + meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath()); + schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + assertEquals(expected, schema); + + HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); + try { + server.start(); + InetSocketAddress addr = server.getBindAddress(); + + String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath(); + meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url); + schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + } finally { + server.stop(); + } + assertEquals(expected, schema); + } + + @Test + public void testGetSchemaFromHttp() throws IOException, URISyntaxException { + HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); + try { + server.start(); + InetSocketAddress addr = server.getBindAddress(); + + Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath()); + assertEquals(expected, schema); + } finally { + server.stop(); + } + } + + @Test + public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException { + Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf()); + + assertEquals(expected, schema); + } +}
