http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index bf13f3c..7900195 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -115,7 +115,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + storeType + ".idx"), @@ -189,7 +189,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"), @@ -281,7 +281,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + storeType + ".idx"), @@ -352,7 +352,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"), @@ -442,7 +442,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"), @@ -522,7 +522,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + storeType + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); @@ -604,7 +604,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + storeType + ".idx"), @@ -709,7 +709,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + storeType + ".idx"), @@ -789,7 +789,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); @@ -880,7 +880,7 @@ public class TestBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index 78b16c3..c0dda1f 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -104,7 +104,7 @@ public class TestSingleCSVFileBSTIndex { keySchema.addColumn(new Column("long", Type.INT8)); keySchema.addColumn(new Column("double", Type.FLOAT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, @@ -193,7 +193,7 @@ public class TestSingleCSVFileBSTIndex { keySchema.addColumn(new Column("int", Type.INT4)); keySchema.addColumn(new Column("long", Type.INT8)); - TupleComparator comp = new TupleComparator(keySchema, sortKeys); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"), http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java new file mode 100644 index 0000000..b332364 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.tuple.offheap.*; +import org.junit.Test; + +public class TestBaseTupleBuilder { + + @Test + public void testBuild() { + BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248); + OffHeapRowBlockReader reader = rowBlock.getReader(); + + ZeroCopyTuple inputTuple = new ZeroCopyTuple(); + + HeapTuple heapTuple = null; + ZeroCopyTuple zcTuple = null; + int i = 0; + while(reader.next(inputTuple)) { + RowStoreUtil.convert(inputTuple, builder); + + heapTuple = builder.buildToHeapTuple(); + TestOffHeapRowBlock.validateTupleResult(i, heapTuple); + + zcTuple = builder.buildToZeroCopyTuple(); + TestOffHeapRowBlock.validateTupleResult(i, zcTuple); + + i++; + } + } + + @Test + public void testBuildWithNull() { + BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248); + OffHeapRowBlockReader reader = rowBlock.getReader(); + + ZeroCopyTuple inputTuple = new ZeroCopyTuple(); + + HeapTuple heapTuple = null; + ZeroCopyTuple zcTuple = null; + int i = 0; + while(reader.next(inputTuple)) { + RowStoreUtil.convert(inputTuple, builder); + + heapTuple = builder.buildToHeapTuple(); + TestOffHeapRowBlock.validateNullity(i, heapTuple); + + zcTuple = builder.buildToZeroCopyTuple(); + TestOffHeapRowBlock.validateNullity(i, zcTuple); + + i++; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java new file mode 100644 index 0000000..96f465a --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java @@ -0,0 +1,45 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.catalog.SchemaUtil; +import org.junit.Test; + +public class TestHeapTuple { + + @Test + public void testHeapTuple() { + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + int i = 0; + while (reader.next(zcTuple)) { + byte [] bytes = new byte[zcTuple.nioBuffer().limit()]; + zcTuple.nioBuffer().get(bytes); + + HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema)); + TestOffHeapRowBlock.validateTupleResult(i, heapTuple); + i++; + } + + rowBlock.release(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java new file mode 100644 index 0000000..c43ba38 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java @@ -0,0 +1,577 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.storage.BaseTupleComparator; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.ProtoUtil; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +import static org.apache.tajo.common.TajoDataTypes.Type; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestOffHeapRowBlock { + private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class); + public static String UNICODE_FIELD_PREFIX = "abc_ê°ëë¤_"; + public static Schema schema; + + static { + schema = new Schema(); + schema.addColumn("col0", Type.BOOLEAN); + schema.addColumn("col1", Type.INT2); + schema.addColumn("col2", Type.INT4); + schema.addColumn("col3", Type.INT8); + schema.addColumn("col4", Type.FLOAT4); + schema.addColumn("col5", Type.FLOAT8); + schema.addColumn("col6", Type.TEXT); + schema.addColumn("col7", Type.TIMESTAMP); + schema.addColumn("col8", Type.DATE); + schema.addColumn("col9", Type.TIME); + schema.addColumn("col10", Type.INTERVAL); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", + CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName())); + } + + private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) { + LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " + + (endTime - startTime) + " msec"); + } + + @Test + public void testPutAndReadValidation() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + ZeroCopyTuple tuple = new ZeroCopyTuple(); + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRow(i, rowBlock.getWriter()); + + reader.reset(); + int j = 0; + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + + j++; + } + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); + + long readStart = System.currentTimeMillis(); + tuple = new ZeroCopyTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + @Test + public void testNullityValidation() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRowBlockWithNull(i, rowBlock.getWriter()); + + reader.reset(); + int j = 0; + while(reader.next(tuple)) { + validateNullity(j, tuple); + + j++; + } + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec"); + + long readStart = System.currentTimeMillis(); + tuple = new ZeroCopyTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + validateNullity(j, tuple); + + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + @Test + public void testEmptyRow() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + rowBlock.getWriter().startRow(); + // empty columns + rowBlock.getWriter().endRow(); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing tooks " + (writeEnd - writeStart) + " msec"); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + rowBlock.release(); + + assertEquals(rowNum, j); + assertEquals(rowNum, rowBlock.rows()); + } + + @Test + public void testSortBenchmark() { + int rowNum = 1000; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList(); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + reader.reset(); + while(reader.next(tuple)) { + unSafeTuples.add(tuple); + tuple = new ZeroCopyTuple(); + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4)); + BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec}); + + long sortStart = System.currentTimeMillis(); + Collections.sort(unSafeTuples, comparator); + long sortEnd = System.currentTimeMillis(); + LOG.info("sorting took " + (sortEnd - sortStart) + " msec"); + rowBlock.release(); + } + + @Test + public void testVTuplePutAndGetBenchmark() { + int rowNum = 1000; + + List<VTuple> rowBlock = Lists.newArrayList(); + long writeStart = System.currentTimeMillis(); + VTuple tuple; + for (int i = 0; i < rowNum; i++) { + tuple = new VTuple(schema.size()); + fillVTuple(i, tuple); + rowBlock.add(tuple); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); + + long readStart = System.currentTimeMillis(); + int j = 0; + for (VTuple t : rowBlock) { + validateTupleResult(j, t); + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + int count = 0; + for (int l = 0; l < rowBlock.size(); l++) { + for(int m = 0; m < schema.size(); m++ ) { + if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) { + count ++; + } + } + } + // For preventing unnecessary code elimination optimization. + LOG.info("The number of INT4 values is " + count + "."); + } + + @Test + public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() { + int rowNum = 1000; + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100); + + long writeStart = System.currentTimeMillis(); + VTuple tuple = new VTuple(schema.size()); + for (int i = 0; i < rowNum; i++) { + fillVTuple(i, tuple); + + RowStoreUtil.convert(tuple, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); + + validateResults(rowBlock); + rowBlock.release(); + } + + @Test + public void testSerDerOfRowBlock() { + int rowNum = 1000; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + + ByteBuffer bb = rowBlock.nioBuffer(); + OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); + validateResults(restoredRowBlock); + rowBlock.release(); + } + + @Test + public void testSerDerOfZeroCopyTuple() { + int rowNum = 1000; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + + ByteBuffer bb = rowBlock.nioBuffer(); + OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + ZeroCopyTuple copyTuple = new ZeroCopyTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + ByteBuffer copy = tuple.nioBuffer(); + copyTuple.set(copy, SchemaUtil.toDataTypes(schema)); + + validateTupleResult(j, copyTuple); + + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + public static OffHeapRowBlock createRowBlock(int rowNum) { + long allocateStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); + long allocatedEnd = System.currentTimeMillis(); + LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " + + (allocatedEnd - allocateStart) + " msec"); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRow(i, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing takes " + (writeEnd - writeStart) + " msec"); + + return rowBlock; + } + + public static OffHeapRowBlock createRowBlockWithNull(int rowNum) { + long allocateStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); + long allocatedEnd = System.currentTimeMillis(); + LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " + + (allocatedEnd - allocateStart) + " msec"); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRowBlockWithNull(i, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); + + return rowBlock; + } + + public static void fillRow(int i, RowWriter builder) { + builder.startRow(); + builder.putBool(i % 1 == 0 ? true : false); // 0 + builder.putInt2((short) 1); // 1 + builder.putInt4(i); // 2 + builder.putInt8(i); // 3 + builder.putFloat4(i); // 4 + builder.putFloat8(i); // 5 + builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 + builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + builder.endRow(); + } + + public static void fillRowBlockWithNull(int i, RowWriter writer) { + writer.startRow(); + + if (i == 0) { + writer.skipField(); + } else { + writer.putBool(i % 1 == 0 ? true : false); // 0 + } + if (i % 1 == 0) { + writer.skipField(); + } else { + writer.putInt2((short) 1); // 1 + } + + if (i % 2 == 0) { + writer.skipField(); + } else { + writer.putInt4(i); // 2 + } + + if (i % 3 == 0) { + writer.skipField(); + } else { + writer.putInt8(i); // 3 + } + + if (i % 4 == 0) { + writer.skipField(); + } else { + writer.putFloat4(i); // 4 + } + + if (i % 5 == 0) { + writer.skipField(); + } else { + writer.putFloat8(i); // 5 + } + + if (i % 6 == 0) { + writer.skipField(); + } else { + writer.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 + } + + if (i % 7 == 0) { + writer.skipField(); + } else { + writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + } + + if (i % 8 == 0) { + writer.skipField(); + } else { + writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + } + + if (i % 9 == 0) { + writer.skipField(); + } else { + writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + } + + if (i % 10 == 0) { + writer.skipField(); + } else { + writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + } + + if (i % 11 == 0) { + writer.skipField(); + } else { + writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + } + + if (i % 12 == 0) { + writer.skipField(); + } else { + writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + } + + writer.endRow(); + } + + public static void fillVTuple(int i, VTuple tuple) { + tuple.put(0, DatumFactory.createBool(i % 1 == 0)); + tuple.put(1, DatumFactory.createInt2((short) 1)); + tuple.put(2, DatumFactory.createInt4(i)); + tuple.put(3, DatumFactory.createInt8(i)); + tuple.put(4, DatumFactory.createFloat4(i)); + tuple.put(5, DatumFactory.createFloat8(i)); + tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes())); + tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 + tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 + tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 + tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10 + tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 + tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; + } + + public static void validateResults(OffHeapRowBlock rowBlock) { + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + int j = 0; + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + reader.reset(); + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("Reading takes " + (readEnd - readStart) + " msec"); + } + + public static void validateTupleResult(int j, Tuple t) { + assertTrue((j % 1 == 0) == t.getBool(0)); + assertTrue(1 == t.getInt2(1)); + assertEquals(j, t.getInt4(2)); + assertEquals(j, t.getInt8(3)); + assertTrue(j == t.getFloat4(4)); + assertTrue(j == t.getFloat8(5)); + assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6)); + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); + } + + public static void validateNullity(int j, Tuple tuple) { + if (j == 0) { + tuple.isNull(0); + } else { + assertTrue((j % 1 == 0) == tuple.getBool(0)); + } + + if (j % 1 == 0) { + tuple.isNull(1); + } else { + assertTrue(1 == tuple.getInt2(1)); + } + + if (j % 2 == 0) { + tuple.isNull(2); + } else { + assertEquals(j, tuple.getInt4(2)); + } + + if (j % 3 == 0) { + tuple.isNull(3); + } else { + assertEquals(j, tuple.getInt8(3)); + } + + if (j % 4 == 0) { + tuple.isNull(4); + } else { + assertTrue(j == tuple.getFloat4(4)); + } + + if (j % 5 == 0) { + tuple.isNull(5); + } else { + assertTrue(j == tuple.getFloat8(5)); + } + + if (j % 6 == 0) { + tuple.isNull(6); + } else { + assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6)); + } + + if (j % 7 == 0) { + tuple.isNull(7); + } else { + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); + } + + if (j % 8 == 0) { + tuple.isNull(8); + } else { + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); + } + + if (j % 9 == 0) { + tuple.isNull(9); + } else { + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); + } + + if (j % 10 == 0) { + tuple.isNull(10); + } else { + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); + } + + if (j % 11 == 0) { + tuple.isNull(11); + } else { + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); + } + + if (j % 12 == 0) { + tuple.isNull(12); + } else { + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/514ed847/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java new file mode 100644 index 0000000..1eb9c17 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.unit.StorageUnit; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestResizableSpec { + + @Test + public void testResizableLimit() { + ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f); + + long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); + + assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); + + assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB)); + + assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB)); + + assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1); + + assertFalse(limit.canIncrease(limit.limit())); + } + + @Test + public void testFixedLimit() { + FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f); + + assertEquals(limit.limit(), 100 * StorageUnit.MB); + + assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000)); + + assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB)); + + assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB)); + + assertFalse(limit.canIncrease(limit.limit())); + } +} \ No newline at end of file
