http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java index 0000000,0000000..c43ba38 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java @@@ -1,0 -1,0 +1,577 @@@ ++/*** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.tuple.offheap; ++ ++import com.google.common.collect.Lists; ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.tajo.catalog.*; ++import org.apache.tajo.common.TajoDataTypes; ++import org.apache.tajo.datum.DatumFactory; ++import org.apache.tajo.datum.ProtobufDatum; ++import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; ++import org.apache.tajo.storage.BaseTupleComparator; ++import org.apache.tajo.storage.RowStoreUtil; ++import org.apache.tajo.storage.Tuple; ++import org.apache.tajo.storage.VTuple; ++import org.apache.tajo.unit.StorageUnit; ++import org.apache.tajo.util.FileUtil; ++import org.apache.tajo.util.ProtoUtil; ++import org.junit.Test; ++ ++import java.nio.ByteBuffer; ++import java.util.Collections; ++import java.util.List; ++ ++import static org.apache.tajo.common.TajoDataTypes.Type; ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertTrue; ++ ++public class TestOffHeapRowBlock { ++ private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class); ++ public static String UNICODE_FIELD_PREFIX = "abc_ê°ëë¤_"; ++ public static Schema schema; ++ ++ static { ++ schema = new Schema(); ++ schema.addColumn("col0", Type.BOOLEAN); ++ schema.addColumn("col1", Type.INT2); ++ schema.addColumn("col2", Type.INT4); ++ schema.addColumn("col3", Type.INT8); ++ schema.addColumn("col4", Type.FLOAT4); ++ schema.addColumn("col5", Type.FLOAT8); ++ schema.addColumn("col6", Type.TEXT); ++ schema.addColumn("col7", Type.TIMESTAMP); ++ schema.addColumn("col8", Type.DATE); ++ schema.addColumn("col9", Type.TIME); ++ schema.addColumn("col10", Type.INTERVAL); ++ schema.addColumn("col11", Type.INET4); ++ schema.addColumn("col12", ++ CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName())); ++ } ++ ++ private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) { ++ LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " ++ + (endTime - startTime) + " msec"); ++ } ++ ++ @Test ++ public void testPutAndReadValidation() { ++ int rowNum = 1000; ++ ++ long allocStart = System.currentTimeMillis(); ++ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); ++ long allocEnd = System.currentTimeMillis(); ++ explainRowBlockAllocation(rowBlock, allocStart, allocEnd); ++ ++ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); ++ ++ ZeroCopyTuple tuple = new ZeroCopyTuple(); ++ long writeStart = System.currentTimeMillis(); ++ for (int i = 0; i < rowNum; i++) { ++ fillRow(i, rowBlock.getWriter()); ++ ++ reader.reset(); ++ int j = 0; ++ while(reader.next(tuple)) { ++ validateTupleResult(j, tuple); ++ ++ j++; ++ } ++ } ++ long writeEnd = System.currentTimeMillis(); ++ LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); ++ ++ long readStart = System.currentTimeMillis(); ++ tuple = new ZeroCopyTuple(); ++ int j = 0; ++ reader.reset(); ++ while(reader.next(tuple)) { ++ validateTupleResult(j, tuple); ++ j++; ++ } ++ long readEnd = System.currentTimeMillis(); ++ LOG.info("reading takes " + (readEnd - readStart) + " msec"); ++ ++ rowBlock.release(); ++ } ++ ++ @Test ++ public void testNullityValidation() { ++ int rowNum = 1000; ++ ++ long allocStart = System.currentTimeMillis(); ++ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); ++ long allocEnd = System.currentTimeMillis(); ++ explainRowBlockAllocation(rowBlock, allocStart, allocEnd); ++ ++ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); ++ ZeroCopyTuple tuple = new ZeroCopyTuple(); ++ long writeStart = System.currentTimeMillis(); ++ for (int i = 0; i < rowNum; i++) { ++ fillRowBlockWithNull(i, rowBlock.getWriter()); ++ ++ reader.reset(); ++ int j = 0; ++ while(reader.next(tuple)) { ++ validateNullity(j, tuple); ++ ++ j++; ++ } ++ } ++ long writeEnd = System.currentTimeMillis(); ++ LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec"); ++ ++ long readStart = System.currentTimeMillis(); ++ tuple = new ZeroCopyTuple(); ++ int j = 0; ++ reader.reset(); ++ while(reader.next(tuple)) { ++ validateNullity(j, tuple); ++ ++ j++; ++ } ++ long readEnd = System.currentTimeMillis(); ++ LOG.info("reading takes " + (readEnd - readStart) + " msec"); ++ ++ rowBlock.release(); ++ } ++ ++ @Test ++ public void testEmptyRow() { ++ int rowNum = 1000; ++ ++ long allocStart = System.currentTimeMillis(); ++ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10); ++ long allocEnd = System.currentTimeMillis(); ++ explainRowBlockAllocation(rowBlock, allocStart, allocEnd); ++ ++ long writeStart = System.currentTimeMillis(); ++ for (int i = 0; i < rowNum; i++) { ++ rowBlock.getWriter().startRow(); ++ // empty columns ++ rowBlock.getWriter().endRow(); ++ } ++ long writeEnd = System.currentTimeMillis(); ++ LOG.info("writing tooks " + (writeEnd - writeStart) + " msec"); ++ ++ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); ++ ++ long readStart = System.currentTimeMillis(); ++ ZeroCopyTuple tuple = new ZeroCopyTuple(); ++ int j = 0; ++ reader.reset(); ++ while(reader.next(tuple)) { ++ j++; ++ } ++ long readEnd = System.currentTimeMillis(); ++ LOG.info("reading takes " + (readEnd - readStart) + " msec"); ++ rowBlock.release(); ++ ++ assertEquals(rowNum, j); ++ assertEquals(rowNum, rowBlock.rows()); ++ } ++ ++ @Test ++ public void testSortBenchmark() { ++ int rowNum = 1000; ++ ++ OffHeapRowBlock rowBlock = createRowBlock(rowNum); ++ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); ++ ++ List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList(); ++ ++ long readStart = System.currentTimeMillis(); ++ ZeroCopyTuple tuple = new ZeroCopyTuple(); ++ reader.reset(); ++ while(reader.next(tuple)) { ++ unSafeTuples.add(tuple); ++ tuple = new ZeroCopyTuple(); ++ } ++ long readEnd = System.currentTimeMillis(); ++ LOG.info("reading takes " + (readEnd - readStart) + " msec"); ++ ++ SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4)); ++ BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec}); ++ ++ long sortStart = System.currentTimeMillis(); ++ Collections.sort(unSafeTuples, comparator); ++ long sortEnd = System.currentTimeMillis(); ++ LOG.info("sorting took " + (sortEnd - sortStart) + " msec"); ++ rowBlock.release(); ++ } ++ ++ @Test ++ public void testVTuplePutAndGetBenchmark() { ++ int rowNum = 1000; ++ ++ List<VTuple> rowBlock = Lists.newArrayList(); ++ long writeStart = System.currentTimeMillis(); ++ VTuple tuple; ++ for (int i = 0; i < rowNum; i++) { ++ tuple = new VTuple(schema.size()); ++ fillVTuple(i, tuple); ++ rowBlock.add(tuple); ++ } ++ long writeEnd = System.currentTimeMillis(); ++ LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); ++ ++ long readStart = System.currentTimeMillis(); ++ int j = 0; ++ for (VTuple t : rowBlock) { ++ validateTupleResult(j, t); ++ j++; ++ } ++ long readEnd = System.currentTimeMillis(); ++ LOG.info("reading takes " + (readEnd - readStart) + " msec"); ++ ++ int count = 0; ++ for (int l = 0; l < rowBlock.size(); l++) { ++ for(int m = 0; m < schema.size(); m++ ) { ++ if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) { ++ count ++; ++ } ++ } ++ } ++ // For preventing unnecessary code elimination optimization. ++ LOG.info("The number of INT4 values is " + count + "."); ++ } ++ ++ @Test ++ public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() { ++ int rowNum = 1000; ++ ++ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100); ++ ++ long writeStart = System.currentTimeMillis(); ++ VTuple tuple = new VTuple(schema.size()); ++ for (int i = 0; i < rowNum; i++) { ++ fillVTuple(i, tuple); ++ ++ RowStoreUtil.convert(tuple, rowBlock.getWriter()); ++ } ++ long writeEnd = System.currentTimeMillis(); ++ LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); ++ ++ validateResults(rowBlock); ++ rowBlock.release(); ++ } ++ ++ @Test ++ public void testSerDerOfRowBlock() { ++ int rowNum = 1000; ++ ++ OffHeapRowBlock rowBlock = createRowBlock(rowNum); ++ ++ ByteBuffer bb = rowBlock.nioBuffer(); ++ OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); ++ validateResults(restoredRowBlock); ++ rowBlock.release(); ++ } ++ ++ @Test ++ public void testSerDerOfZeroCopyTuple() { ++ int rowNum = 1000; ++ ++ OffHeapRowBlock rowBlock = createRowBlock(rowNum); ++ ++ ByteBuffer bb = rowBlock.nioBuffer(); ++ OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); ++ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock); ++ ++ long readStart = System.currentTimeMillis(); ++ ZeroCopyTuple tuple = new ZeroCopyTuple(); ++ ZeroCopyTuple copyTuple = new ZeroCopyTuple(); ++ int j = 0; ++ reader.reset(); ++ while(reader.next(tuple)) { ++ ByteBuffer copy = tuple.nioBuffer(); ++ copyTuple.set(copy, SchemaUtil.toDataTypes(schema)); ++ ++ validateTupleResult(j, copyTuple); ++ ++ j++; ++ } ++ long readEnd = System.currentTimeMillis(); ++ LOG.info("reading takes " + (readEnd - readStart) + " msec"); ++ ++ rowBlock.release(); ++ } ++ ++ public static OffHeapRowBlock createRowBlock(int rowNum) { ++ long allocateStart = System.currentTimeMillis(); ++ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); ++ long allocatedEnd = System.currentTimeMillis(); ++ LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " ++ + (allocatedEnd - allocateStart) + " msec"); ++ ++ long writeStart = System.currentTimeMillis(); ++ for (int i = 0; i < rowNum; i++) { ++ fillRow(i, rowBlock.getWriter()); ++ } ++ long writeEnd = System.currentTimeMillis(); ++ LOG.info("writing takes " + (writeEnd - writeStart) + " msec"); ++ ++ return rowBlock; ++ } ++ ++ public static OffHeapRowBlock createRowBlockWithNull(int rowNum) { ++ long allocateStart = System.currentTimeMillis(); ++ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); ++ long allocatedEnd = System.currentTimeMillis(); ++ LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " ++ + (allocatedEnd - allocateStart) + " msec"); ++ ++ long writeStart = System.currentTimeMillis(); ++ for (int i = 0; i < rowNum; i++) { ++ fillRowBlockWithNull(i, rowBlock.getWriter()); ++ } ++ long writeEnd = System.currentTimeMillis(); ++ LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); ++ ++ return rowBlock; ++ } ++ ++ public static void fillRow(int i, RowWriter builder) { ++ builder.startRow(); ++ builder.putBool(i % 1 == 0 ? true : false); // 0 ++ builder.putInt2((short) 1); // 1 ++ builder.putInt4(i); // 2 ++ builder.putInt8(i); // 3 ++ builder.putFloat4(i); // 4 ++ builder.putFloat8(i); // 5 ++ builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 ++ builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 ++ builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 ++ builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 ++ builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 ++ builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 ++ builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 ++ builder.endRow(); ++ } ++ ++ public static void fillRowBlockWithNull(int i, RowWriter writer) { ++ writer.startRow(); ++ ++ if (i == 0) { ++ writer.skipField(); ++ } else { ++ writer.putBool(i % 1 == 0 ? true : false); // 0 ++ } ++ if (i % 1 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putInt2((short) 1); // 1 ++ } ++ ++ if (i % 2 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putInt4(i); // 2 ++ } ++ ++ if (i % 3 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putInt8(i); // 3 ++ } ++ ++ if (i % 4 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putFloat4(i); // 4 ++ } ++ ++ if (i % 5 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putFloat8(i); // 5 ++ } ++ ++ if (i % 6 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 ++ } ++ ++ if (i % 7 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 ++ } ++ ++ if (i % 8 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 ++ } ++ ++ if (i % 9 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 ++ } ++ ++ if (i % 10 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 ++ } ++ ++ if (i % 11 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 ++ } ++ ++ if (i % 12 == 0) { ++ writer.skipField(); ++ } else { ++ writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 ++ } ++ ++ writer.endRow(); ++ } ++ ++ public static void fillVTuple(int i, VTuple tuple) { ++ tuple.put(0, DatumFactory.createBool(i % 1 == 0)); ++ tuple.put(1, DatumFactory.createInt2((short) 1)); ++ tuple.put(2, DatumFactory.createInt4(i)); ++ tuple.put(3, DatumFactory.createInt8(i)); ++ tuple.put(4, DatumFactory.createFloat4(i)); ++ tuple.put(5, DatumFactory.createFloat8(i)); ++ tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes())); ++ tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 ++ tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 ++ tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 ++ tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10 ++ tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 ++ tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; ++ } ++ ++ public static void validateResults(OffHeapRowBlock rowBlock) { ++ long readStart = System.currentTimeMillis(); ++ ZeroCopyTuple tuple = new ZeroCopyTuple(); ++ int j = 0; ++ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); ++ reader.reset(); ++ while(reader.next(tuple)) { ++ validateTupleResult(j, tuple); ++ j++; ++ } ++ long readEnd = System.currentTimeMillis(); ++ LOG.info("Reading takes " + (readEnd - readStart) + " msec"); ++ } ++ ++ public static void validateTupleResult(int j, Tuple t) { ++ assertTrue((j % 1 == 0) == t.getBool(0)); ++ assertTrue(1 == t.getInt2(1)); ++ assertEquals(j, t.getInt4(2)); ++ assertEquals(j, t.getInt8(3)); ++ assertTrue(j == t.getFloat4(4)); ++ assertTrue(j == t.getFloat8(5)); ++ assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6)); ++ assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); ++ assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); ++ assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); ++ assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); ++ assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); ++ assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); ++ } ++ ++ public static void validateNullity(int j, Tuple tuple) { ++ if (j == 0) { ++ tuple.isNull(0); ++ } else { ++ assertTrue((j % 1 == 0) == tuple.getBool(0)); ++ } ++ ++ if (j % 1 == 0) { ++ tuple.isNull(1); ++ } else { ++ assertTrue(1 == tuple.getInt2(1)); ++ } ++ ++ if (j % 2 == 0) { ++ tuple.isNull(2); ++ } else { ++ assertEquals(j, tuple.getInt4(2)); ++ } ++ ++ if (j % 3 == 0) { ++ tuple.isNull(3); ++ } else { ++ assertEquals(j, tuple.getInt8(3)); ++ } ++ ++ if (j % 4 == 0) { ++ tuple.isNull(4); ++ } else { ++ assertTrue(j == tuple.getFloat4(4)); ++ } ++ ++ if (j % 5 == 0) { ++ tuple.isNull(5); ++ } else { ++ assertTrue(j == tuple.getFloat8(5)); ++ } ++ ++ if (j % 6 == 0) { ++ tuple.isNull(6); ++ } else { ++ assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6)); ++ } ++ ++ if (j % 7 == 0) { ++ tuple.isNull(7); ++ } else { ++ assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); ++ } ++ ++ if (j % 8 == 0) { ++ tuple.isNull(8); ++ } else { ++ assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); ++ } ++ ++ if (j % 9 == 0) { ++ tuple.isNull(9); ++ } else { ++ assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); ++ } ++ ++ if (j % 10 == 0) { ++ tuple.isNull(10); ++ } else { ++ assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); ++ } ++ ++ if (j % 11 == 0) { ++ tuple.isNull(11); ++ } else { ++ assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); ++ } ++ ++ if (j % 12 == 0) { ++ tuple.isNull(12); ++ } else { ++ assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); ++ } ++ } ++}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java index 0000000,0000000..1eb9c17 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java @@@ -1,0 -1,0 +1,59 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.tuple.offheap; ++ ++import org.apache.tajo.unit.StorageUnit; ++import org.junit.Test; ++ ++import static org.junit.Assert.*; ++ ++public class TestResizableSpec { ++ ++ @Test ++ public void testResizableLimit() { ++ ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f); ++ ++ long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); ++ ++ assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); ++ ++ assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB)); ++ ++ assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB)); ++ ++ assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1); ++ ++ assertFalse(limit.canIncrease(limit.limit())); ++ } ++ ++ @Test ++ public void testFixedLimit() { ++ FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f); ++ ++ assertEquals(limit.limit(), 100 * StorageUnit.MB); ++ ++ assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000)); ++ ++ assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB)); ++ ++ assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB)); ++ ++ assertFalse(limit.canIncrease(limit.limit())); ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java index 0000000,0000000..0b3755d new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java @@@ -1,0 -1,0 +1,37 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage; ++ ++import io.netty.buffer.ByteBuf; ++import org.apache.tajo.catalog.Column; ++import org.apache.tajo.datum.Datum; ++import org.apache.tajo.storage.text.TextLineParsingError; ++ ++import java.io.IOException; ++import java.io.OutputStream; ++ ++ ++public interface FieldSerializerDeserializer { ++ ++ public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException; ++ ++ public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) ++ throws IOException, TextLineParsingError; ++ ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index 85f91cc,0000000..dbb8bd0 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@@ -1,219 -1,0 +1,220 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.FileAppender; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * FileAppender for writing to Avro files. + */ +public class AvroAppender extends FileAppender { + private TableStatistics stats; + private Schema avroSchema; + private List<Schema.Field> avroFields; + private DataFileWriter<GenericRecord> dataFileWriter; + + /** + * Creates a new AvroAppender. + * + * @param conf Configuration properties. ++ * @param taskAttemptId The task attempt id + * @param schema The table schema. + * @param meta The table metadata. + * @param workDir The path of the Parquet file to write to. + */ + public AvroAppender(Configuration conf, + QueryUnitAttemptId taskAttemptId, + org.apache.tajo.catalog.Schema schema, + TableMeta meta, Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); + } + + /** + * Initializes the Appender. + */ + public void init() throws IOException { + FileSystem fs = path.getFileSystem(conf); + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.toString()); + } + FSDataOutputStream outputStream = fs.create(path); + + avroSchema = AvroUtil.getAvroSchema(meta, conf); + avroFields = avroSchema.getFields(); + + DatumWriter<GenericRecord> datumWriter = + new GenericDatumWriter<GenericRecord>(avroSchema); + dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); + dataFileWriter.create(avroSchema, outputStream); + + if (enabledStats) { + this.stats = new TableStatistics(schema); + } + super.init(); + } + + /** + * Gets the current offset. Tracking offsets is currenly not implemented, so + * this method always returns 0. + * + * @return 0 + */ + @Override + public long getOffset() throws IOException { + return 0; + } + + private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { + if (tuple.get(i) instanceof NullDatum) { + return null; + } + switch (avroType) { + case NULL: + return null; + case BOOLEAN: + return tuple.getBool(i); + case INT: + return tuple.getInt4(i); + case LONG: + return tuple.getInt8(i); + case FLOAT: + return tuple.getFloat4(i); + case DOUBLE: + return tuple.getFloat8(i); + case BYTES: + case FIXED: + return ByteBuffer.wrap(tuple.getBytes(i)); + case STRING: + return tuple.getText(i); + default: + throw new RuntimeException("Unknown primitive type."); + } + } + + /** + * Write a Tuple to the Avro file. + * + * @param tuple The Tuple to write. + */ + @Override + public void addTuple(Tuple tuple) throws IOException { + GenericRecord record = new GenericData.Record(avroSchema); + for (int i = 0; i < schema.size(); ++i) { + Column column = schema.getColumn(i); + if (enabledStats) { + stats.analyzeField(i, tuple.get(i)); + } + Object value; + Schema.Field avroField = avroFields.get(i); + Schema.Type avroType = avroField.schema().getType(); + switch (avroType) { + case NULL: + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BYTES: + case STRING: + case FIXED: + value = getPrimitive(tuple, i, avroType); + break; + case RECORD: + throw new RuntimeException("Avro RECORD not supported."); + case ENUM: + throw new RuntimeException("Avro ENUM not supported."); + case MAP: + throw new RuntimeException("Avro MAP not supported."); + case UNION: + List<Schema> schemas = avroField.schema().getTypes(); + if (schemas.size() != 2) { + throw new RuntimeException("Avro UNION not supported."); + } + if (schemas.get(0).getType().equals(Schema.Type.NULL)) { + value = getPrimitive(tuple, i, schemas.get(1).getType()); + } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { + value = getPrimitive(tuple, i, schemas.get(0).getType()); + } else { + throw new RuntimeException("Avro UNION not supported."); + } + break; + default: + throw new RuntimeException("Unknown type: " + avroType); + } + record.put(i, value); + } + dataFileWriter.append(record); + + if (enabledStats) { + stats.incrementRow(); + } + } + + /** + * Flushes the current state of the file. + */ + @Override + public void flush() throws IOException { + dataFileWriter.flush(); + } + + /** + * Closes the Appender. + */ + @Override + public void close() throws IOException { + dataFileWriter.close(); + } + + /** + * If table statistics is enabled, retrieve the table statistics. + * + * @return Table statistics if enabled or null otherwise. + */ + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index c1f63a8,0000000..dfe36f6 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@@ -1,220 -1,0 +1,225 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + + +import io.netty.buffer.ByteBuf; +import net.minidev.json.JSONArray; +import net.minidev.json.JSONObject; +import net.minidev.json.parser.JSONParser; ++import net.minidev.json.parser.ParseException; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; ++import org.apache.tajo.datum.TextDatum; ++import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.text.TextLineDeserializer; +import org.apache.tajo.storage.text.TextLineParsingError; + +import java.io.IOException; +import java.util.Iterator; + +public class JsonLineDeserializer extends TextLineDeserializer { + private JSONParser parser; + private Type [] types; + private String [] columnNames; + + public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + super(schema, meta, targetColumnIndexes); + } + + @Override + public void init() { + types = SchemaUtil.toTypes(schema); + columnNames = SchemaUtil.toSimpleNames(schema); + + parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE); + } + + @Override + public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError { + byte [] line = new byte[buf.readableBytes()]; + buf.readBytes(line); + + try { + JSONObject object = (JSONObject) parser.parse(line); + + for (int i = 0; i < targetColumnIndexes.length; i++) { + int actualIdx = targetColumnIndexes[i]; + String fieldName = columnNames[actualIdx]; + + if (!object.containsKey(fieldName)) { + output.put(actualIdx, NullDatum.get()); + continue; + } + + switch (types[actualIdx]) { + case BOOLEAN: + String boolStr = object.getAsString(fieldName); + if (boolStr != null) { + output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true"))); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case CHAR: + String charStr = object.getAsString(fieldName); + if (charStr != null) { + output.put(actualIdx, DatumFactory.createChar(charStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT1: + case INT2: + Number int2Num = object.getAsNumber(fieldName); + if (int2Num != null) { + output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT4: + Number int4Num = object.getAsNumber(fieldName); + if (int4Num != null) { + output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT8: + Number int8Num = object.getAsNumber(fieldName); + if (int8Num != null) { + output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case FLOAT4: + Number float4Num = object.getAsNumber(fieldName); + if (float4Num != null) { + output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case FLOAT8: + Number float8Num = object.getAsNumber(fieldName); + if (float8Num != null) { + output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TEXT: + String textStr = object.getAsString(fieldName); + if (textStr != null) { + output.put(actualIdx, DatumFactory.createText(textStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TIMESTAMP: + String timestampStr = object.getAsString(fieldName); + if (timestampStr != null) { + output.put(actualIdx, DatumFactory.createTimestamp(timestampStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TIME: + String timeStr = object.getAsString(fieldName); + if (timeStr != null) { + output.put(actualIdx, DatumFactory.createTime(timeStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case DATE: + String dateStr = object.getAsString(fieldName); + if (dateStr != null) { + output.put(actualIdx, DatumFactory.createDate(dateStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case BIT: + case BINARY: + case VARBINARY: + case BLOB: { + Object jsonObject = object.get(fieldName); + + if (jsonObject == null) { + output.put(actualIdx, NullDatum.get()); + break; - } if (jsonObject instanceof String) { - output.put(actualIdx, DatumFactory.createBlob((String)jsonObject)); ++ } ++ if (jsonObject instanceof String) { ++ output.put(actualIdx, DatumFactory.createBlob((String) jsonObject)); + } else if (jsonObject instanceof JSONArray) { + JSONArray jsonArray = (JSONArray) jsonObject; + byte[] bytes = new byte[jsonArray.size()]; + Iterator<Object> it = jsonArray.iterator(); + int arrayIdx = 0; + while (it.hasNext()) { + bytes[arrayIdx++] = ((Long) it.next()).byteValue(); + } + if (bytes.length > 0) { + output.put(actualIdx, DatumFactory.createBlob(bytes)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + } else { + throw new IOException("Unknown json object: " + object.getClass().getSimpleName()); + } + break; + } + case INET4: + String inetStr = object.getAsString(fieldName); + if (inetStr != null) { + output.put(actualIdx, DatumFactory.createInet4(inetStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + + case NULL_TYPE: + output.put(actualIdx, NullDatum.get()); + break; + + default: + throw new NotImplementedException(types[actualIdx].name() + " is not supported."); + } + } - ++ } catch (ParseException pe) { ++ throw new TextLineParsingError(new String(line, TextDatum.DEFAULT_CHARSET), pe); + } catch (Throwable e) { + throw new IOException(e); + } + } + + @Override + public void release() { + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 7848198,0000000..8824e3e mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@@ -1,475 -1,0 +1,481 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.exception.AlreadyExistsStorageException; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.util.ReflectionUtil; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM; +import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM; + +public class DelimitedTextFile { + + public static final byte LF = '\n'; + + private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); + + /** it caches line serde classes. */ + private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache = + new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>(); + + /** + * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given, + * it will use the specified serder class. + * + * @return TextLineSerder + */ + public static TextLineSerDe getLineSerde(TableMeta meta) { + TextLineSerDe lineSerder; + + String serDeClassName; + + // if there is no given serde class, it will use CSV line serder. + serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS); + + try { + Class<? extends TextLineSerDe> serdeClass; + + if (serdeClassCache.containsKey(serDeClassName)) { + serdeClass = serdeClassCache.get(serDeClassName); + } else { + serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName); + serdeClassCache.put(serDeClassName, serdeClass); + } + lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass); + } catch (Throwable e) { + throw new RuntimeException("TextLineSerde class cannot be initialized.", e); + } + + return lineSerder; + } + + public static class DelimitedTextFileAppender extends FileAppender { + private final TableMeta meta; + private final Schema schema; + private final FileSystem fs; + private FSDataOutputStream fos; + private DataOutputStream outputStream; + private CompressionOutputStream deflateFilter; + private TableStatistics stats = null; + private Compressor compressor; + private CompressionCodecFactory codecFactory; + private CompressionCodec codec; + private Path compressedPath; + private byte[] nullChars; + private int BUFFER_SIZE = 128 * 1024; + private int bufferedBytes = 0; + private long pos = 0; + + private NonSyncByteArrayOutputStream os; + private TextLineSerializer serializer; + + public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path path) + throws IOException { + super(conf, taskAttemptId, schema, meta, path); + this.fs = path.getFileSystem(conf); + this.meta = meta; + this.schema = schema; + } + + public TextLineSerDe getLineSerde() { + return DelimitedTextFile.getLineSerde(meta); + } + + @Override + public void init() throws IOException { + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.toString()); + } + + if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { + String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); + codecFactory = new CompressionCodecFactory(conf); + codec = codecFactory.getCodecByClassName(codecName); + compressor = CodecPool.getCompressor(codec); + if (compressor != null) compressor.reset(); //builtin gzip is null + + String extension = codec.getDefaultExtension(); + compressedPath = path.suffix(extension); + + if (fs.exists(compressedPath)) { + throw new AlreadyExistsStorageException(compressedPath); + } + + fos = fs.create(compressedPath); + deflateFilter = codec.createOutputStream(fos, compressor); + outputStream = new DataOutputStream(deflateFilter); + + } else { + if (fs.exists(path)) { + throw new AlreadyExistsStorageException(path); + } + fos = fs.create(path); + outputStream = new DataOutputStream(new BufferedOutputStream(fos)); + } + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + serializer = getLineSerde().createSerializer(schema, meta); + serializer.init(); + + if (os == null) { + os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); + } + + os.reset(); + pos = fos.getPos(); + bufferedBytes = 0; + super.init(); + } + + @Override + public void addTuple(Tuple tuple) throws IOException { + // write + int rowBytes = serializer.serialize(os, tuple); + + // new line + os.write(LF); + rowBytes += 1; + + // update positions + pos += rowBytes; + bufferedBytes += rowBytes; + + // refill buffer if necessary + if (bufferedBytes > BUFFER_SIZE) { + flushBuffer(); + } + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } + + private void flushBuffer() throws IOException { + if (os.getLength() > 0) { + os.writeTo(outputStream); + os.reset(); + bufferedBytes = 0; + } + } + + @Override + public long getOffset() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + outputStream.flush(); + } + + @Override + public void close() throws IOException { + + try { + serializer.release(); + + if(outputStream != null){ + flush(); + } + + // Statistical section + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + + if (deflateFilter != null) { + deflateFilter.finish(); + deflateFilter.resetState(); + deflateFilter = null; + } + + os.close(); + } finally { + IOUtils.cleanup(LOG, fos); + if (compressor != null) { + CodecPool.returnCompressor(compressor); + compressor = null; + } + } + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } + + public boolean isCompress() { + return compressor != null; + } + + public String getExtension() { + return codec != null ? codec.getDefaultExtension() : ""; + } + } + + public static class DelimitedTextFileScanner extends FileScanner { + private boolean splittable = false; + private final long startOffset; + + private final long endOffset; + /** The number of actual read records */ + private int recordCount = 0; + private int[] targetColumnIndexes; + + private DelimitedLineReader reader; + private TextLineDeserializer deserializer; + + private int errorPrintOutMaxNum = 5; + /** Maximum number of permissible errors */ + private int errorTorrenceMaxNum; + /** How many errors have occurred? */ + private int errorNum; + + public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, + final Fragment fragment) + throws IOException { + super(conf, schema, meta, fragment); + reader = new DelimitedLineReader(conf, this.fragment); + if (!reader.isCompressed()) { + splittable = true; + } + + startOffset = this.fragment.getStartKey(); + endOffset = startOffset + fragment.getLength(); + + errorTorrenceMaxNum = + Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM)); + } + + + @Override + public void init() throws IOException { + if (reader != null) { + reader.close(); + } ++ + reader = new DelimitedLineReader(conf, fragment); + reader.init(); + recordCount = 0; + + if (targets == null) { + targets = schema.toArray(); + } + + targetColumnIndexes = new int[targets.length]; + for (int i = 0; i < targets.length; i++) { + targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); + } + + super.init(); + Arrays.sort(targetColumnIndexes); + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); + } + + if (startOffset > 0) { + reader.readLine(); // skip first line; + } + + deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes); + deserializer.init(); + } + + public TextLineSerDe getLineSerde() { + return DelimitedTextFile.getLineSerde(meta); + } + + @Override + public float getProgress() { + try { + if (!reader.isReadable()) { + return 1.0f; + } + long filePos = reader.getCompressedPosition(); + if (startOffset == filePos) { + return 0.0f; + } else { + long readBytes = filePos - startOffset; + long remainingBytes = Math.max(endOffset - filePos, 0); + return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0.0f; + } + } + + @Override + public Tuple next() throws IOException { ++ VTuple tuple; + + if (!reader.isReadable()) { + return null; + } + - if (targets.length == 0) { - return EmptyTuple.get(); - } - - VTuple tuple = new VTuple(schema.size()); - + try { + + // this loop will continue until one tuple is build or EOS (end of stream). + do { + + ByteBuf buf = reader.readLine(); ++ ++ // if no more line, then return EOT (end of tuple) + if (buf == null) { + return null; + } + - try { ++ // If there is no required column, we just read each line ++ // and then return an empty tuple without parsing line. ++ if (targets.length == 0) { ++ recordCount++; ++ return EmptyTuple.get(); ++ } + ++ tuple = new VTuple(schema.size()); ++ ++ try { + deserializer.deserialize(buf, tuple); + // if a line is read normaly, it exists this loop. + break; + + } catch (TextLineParsingError tae) { + + errorNum++; + + // suppress too many log prints, which probably cause performance degradation + if (errorNum < errorPrintOutMaxNum) { + LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae); + } + + // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0), + // it checks if the number of parsing error exceeds the max limit. + // Otherwise, it will ignore all parsing errors. + if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) { + throw tae; + } + continue; + } + + } while (reader.isReadable()); // continue until EOS + + // recordCount means the number of actual read records. We increment the count here. + recordCount++; + + return tuple; + + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); + } + } + + @Override + public void reset() throws IOException { + init(); + } + + @Override + public void close() throws IOException { + try { + if (deserializer != null) { + deserializer.release(); + } + + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + } + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner processed record:" + recordCount); + } + } finally { + IOUtils.cleanup(LOG, reader); + reader = null; + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public boolean isSplittable() { + return splittable; + } + + @Override + public TableStats getInputStats() { + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + tableStats.setNumBytes(fragment.getLength()); + } + return tableStats; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java index 0000000,0000000..8749925 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java @@@ -1,0 -1,0 +1,163 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage; ++ ++import com.google.common.base.Preconditions; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.tajo.catalog.CatalogUtil; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.catalog.proto.CatalogProtos; ++import org.apache.tajo.common.TajoDataTypes.Type; ++import org.apache.tajo.conf.TajoConf; ++import org.apache.tajo.datum.Datum; ++import org.apache.tajo.datum.DatumFactory; ++import org.apache.tajo.storage.fragment.FileFragment; ++import org.apache.tajo.util.FileUtil; ++import org.junit.Test; ++ ++import java.io.File; ++import java.io.IOException; ++import java.net.URL; ++ ++import static org.junit.Assert.*; ++ ++public class TestDelimitedTextFile { ++ ++ private static Schema schema = new Schema(); ++ ++ private static Tuple baseTuple = new VTuple(10); ++ ++ static { ++ schema.addColumn("col1", Type.BOOLEAN); ++ schema.addColumn("col2", Type.CHAR, 7); ++ schema.addColumn("col3", Type.INT2); ++ schema.addColumn("col4", Type.INT4); ++ schema.addColumn("col5", Type.INT8); ++ schema.addColumn("col6", Type.FLOAT4); ++ schema.addColumn("col7", Type.FLOAT8); ++ schema.addColumn("col8", Type.TEXT); ++ schema.addColumn("col9", Type.BLOB); ++ schema.addColumn("col10", Type.INET4); ++ ++ baseTuple.put(new Datum[] { ++ DatumFactory.createBool(true), // 0 ++ DatumFactory.createChar("hyunsik"), // 1 ++ DatumFactory.createInt2((short) 17), // 2 ++ DatumFactory.createInt4(59), // 3 ++ DatumFactory.createInt8(23l), // 4 ++ DatumFactory.createFloat4(77.9f), // 5 ++ DatumFactory.createFloat8(271.9d), // 6 ++ DatumFactory.createText("hyunsik"), // 7 ++ DatumFactory.createBlob("hyunsik".getBytes()),// 8 ++ DatumFactory.createInet4("192.168.0.1"), // 9 ++ }); ++ } ++ ++ public static Path getResourcePath(String path, String suffix) { ++ URL resultBaseURL = ClassLoader.getSystemResource(path); ++ return new Path(resultBaseURL.toString(), suffix); ++ } ++ ++ public static Path getResultPath(Class clazz, String fileName) { ++ return new Path (getResourcePath("results", clazz.getSimpleName()), fileName); ++ } ++ ++ public static String getResultText(Class clazz, String fileName) throws IOException { ++ FileSystem localFS = FileSystem.getLocal(new Configuration()); ++ Path path = getResultPath(clazz, fileName); ++ Preconditions.checkState(localFS.exists(path) && localFS.isFile(path)); ++ return FileUtil.readTextFile(new File(path.toUri())); ++ } ++ ++ private static final FileFragment getFileFragment(String fileName) throws IOException { ++ TajoConf conf = new TajoConf(); ++ Path tablePath = new Path(getResourcePath("dataset", "TestDelimitedTextFile"), fileName); ++ FileSystem fs = FileSystem.getLocal(conf); ++ FileStatus status = fs.getFileStatus(tablePath); ++ return new FileFragment("table", tablePath, 0, status.getLen()); ++ } ++ ++ @Test ++ public void testIgnoreAllErrors() throws IOException { ++ TajoConf conf = new TajoConf(); ++ ++ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); ++ meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1"); ++ FileFragment fragment = getFileFragment("testErrorTolerance1.json"); ++ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); ++ scanner.init(); ++ ++ Tuple tuple; ++ int i = 0; ++ while ((tuple = scanner.next()) != null) { ++ assertEquals(baseTuple, tuple); ++ i++; ++ } ++ assertEquals(3, i); ++ scanner.close(); ++ } ++ ++ @Test ++ public void testIgnoreOneErrorTolerance() throws IOException { ++ ++ ++ TajoConf conf = new TajoConf(); ++ ++ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); ++ meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); ++ FileFragment fragment = getFileFragment("testErrorTolerance1.json"); ++ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); ++ scanner.init(); ++ ++ assertNotNull(scanner.next()); ++ assertNotNull(scanner.next()); ++ try { ++ scanner.next(); ++ } catch (IOException ioe) { ++ System.out.println(ioe); ++ return; ++ } finally { ++ scanner.close(); ++ } ++ fail(); ++ } ++ ++ @Test ++ public void testNoErrorTolerance() throws IOException { ++ TajoConf conf = new TajoConf(); ++ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); ++ meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0"); ++ FileFragment fragment = getFileFragment("testErrorTolerance2.json"); ++ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); ++ scanner.init(); ++ ++ try { ++ scanner.next(); ++ } catch (IOException ioe) { ++ return; ++ } finally { ++ scanner.close(); ++ } ++ fail(); ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java index 0000000,0000000..d8e359f new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@@ -1,0 -1,0 +1,193 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage; ++ ++import io.netty.buffer.ByteBuf; ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.io.IOUtils; ++import org.apache.hadoop.io.compress.DeflateCodec; ++import org.apache.tajo.catalog.CatalogUtil; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; ++import org.apache.tajo.common.TajoDataTypes.Type; ++import org.apache.tajo.conf.TajoConf; ++import org.apache.tajo.datum.DatumFactory; ++import org.apache.tajo.datum.NullDatum; ++import org.apache.tajo.storage.fragment.FileFragment; ++import org.apache.tajo.storage.text.ByteBufLineReader; ++import org.apache.tajo.storage.text.DelimitedLineReader; ++import org.apache.tajo.storage.text.DelimitedTextFile; ++import org.apache.tajo.util.CommonTestingUtil; ++import org.apache.tajo.util.FileUtil; ++import org.junit.Test; ++ ++import java.io.File; ++import java.io.FileInputStream; ++import java.io.IOException; ++import java.util.concurrent.atomic.AtomicInteger; ++ ++import static org.junit.Assert.*; ++ ++public class TestLineReader { ++ private static String TEST_PATH = "target/test-data/TestLineReader"; ++ ++ @Test ++ public void testByteBufLineReader() throws IOException { ++ TajoConf conf = new TajoConf(); ++ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); ++ FileSystem fs = testDir.getFileSystem(conf); ++ ++ Schema schema = new Schema(); ++ schema.addColumn("id", Type.INT4); ++ schema.addColumn("age", Type.INT8); ++ schema.addColumn("comment", Type.TEXT); ++ schema.addColumn("comment2", Type.TEXT); ++ ++ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); ++ Path tablePath = new Path(testDir, "line.data"); ++ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( ++ null, null, meta, schema, tablePath); ++ appender.enableStats(); ++ appender.init(); ++ int tupleNum = 10000; ++ VTuple vTuple; ++ ++ for (int i = 0; i < tupleNum; i++) { ++ vTuple = new VTuple(4); ++ vTuple.put(0, DatumFactory.createInt4(i + 1)); ++ vTuple.put(1, DatumFactory.createInt8(25l)); ++ vTuple.put(2, DatumFactory.createText("emiya muljomdao")); ++ vTuple.put(3, NullDatum.get()); ++ appender.addTuple(vTuple); ++ } ++ appender.close(); ++ ++ FileStatus status = fs.getFileStatus(tablePath); ++ ++ ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath)); ++ assertEquals(status.getLen(), channel.available()); ++ ByteBufLineReader reader = new ByteBufLineReader(channel); ++ assertEquals(status.getLen(), reader.available()); ++ ++ long totalRead = 0; ++ int i = 0; ++ AtomicInteger bytes = new AtomicInteger(); ++ for(;;){ ++ ByteBuf buf = reader.readLineBuf(bytes); ++ if(buf == null) break; ++ ++ totalRead += bytes.get(); ++ i++; ++ } ++ IOUtils.cleanup(null, reader, channel, fs); ++ assertEquals(tupleNum, i); ++ assertEquals(status.getLen(), totalRead); ++ assertEquals(status.getLen(), reader.readBytes()); ++ } ++ ++ @Test ++ public void testLineDelimitedReader() throws IOException { ++ TajoConf conf = new TajoConf(); ++ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); ++ FileSystem fs = testDir.getFileSystem(conf); ++ ++ Schema schema = new Schema(); ++ schema.addColumn("id", Type.INT4); ++ schema.addColumn("age", Type.INT8); ++ schema.addColumn("comment", Type.TEXT); ++ schema.addColumn("comment2", Type.TEXT); ++ ++ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); ++ meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); ++ ++ Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName()); ++ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( ++ null, null, meta, schema, tablePath); ++ appender.enableStats(); ++ appender.init(); ++ int tupleNum = 10000; ++ VTuple vTuple; ++ ++ long splitOffset = 0; ++ for (int i = 0; i < tupleNum; i++) { ++ vTuple = new VTuple(4); ++ vTuple.put(0, DatumFactory.createInt4(i + 1)); ++ vTuple.put(1, DatumFactory.createInt8(25l)); ++ vTuple.put(2, DatumFactory.createText("emiya muljomdao")); ++ vTuple.put(3, NullDatum.get()); ++ appender.addTuple(vTuple); ++ ++ if(i == (tupleNum / 2)){ ++ splitOffset = appender.getOffset(); ++ } ++ } ++ String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); ++ appender.close(); ++ ++ tablePath = tablePath.suffix(extension); ++ FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset); ++ DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF ++ assertTrue(reader.isCompressed()); ++ assertFalse(reader.isReadable()); ++ reader.init(); ++ assertTrue(reader.isReadable()); ++ ++ ++ int i = 0; ++ while(reader.isReadable()){ ++ ByteBuf buf = reader.readLine(); ++ if(buf == null) break; ++ i++; ++ } ++ ++ IOUtils.cleanup(null, reader, fs); ++ assertEquals(tupleNum, i); ++ ++ } ++ ++ @Test ++ public void testByteBufLineReaderWithoutTerminating() throws IOException { ++ String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile(); ++ File file = new File(path); ++ String data = FileUtil.readTextFile(file); ++ ++ ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file)); ++ ++ assertEquals(file.length(), channel.available()); ++ ByteBufLineReader reader = new ByteBufLineReader(channel); ++ assertEquals(file.length(), reader.available()); ++ ++ long totalRead = 0; ++ int i = 0; ++ AtomicInteger bytes = new AtomicInteger(); ++ for(;;){ ++ ByteBuf buf = reader.readLineBuf(bytes); ++ if(buf == null) break; ++ totalRead += bytes.get(); ++ i++; ++ } ++ IOUtils.cleanup(null, reader); ++ assertEquals(file.length(), totalRead); ++ assertEquals(file.length(), reader.readBytes()); ++ assertEquals(data.split("\n").length, i); ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java index 0000000,0000000..12ea551 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java @@@ -1,0 -1,0 +1,72 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage; ++ ++import io.netty.buffer.ByteBuf; ++import io.netty.buffer.Unpooled; ++import io.netty.util.CharsetUtil; ++import org.apache.tajo.storage.text.FieldSplitProcessor; ++import org.apache.tajo.storage.text.LineSplitProcessor; ++import org.junit.Test; ++ ++import java.io.IOException; ++ ++import static io.netty.util.ReferenceCountUtil.releaseLater; ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertTrue; ++ ++public class TestSplitProcessor { ++ ++ @Test ++ public void testFieldSplitProcessor() throws IOException { ++ String data = "abc||de"; ++ final ByteBuf buf = releaseLater( ++ Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); ++ ++ final int len = buf.readableBytes(); ++ FieldSplitProcessor processor = new FieldSplitProcessor('|'); ++ ++ assertEquals(3, buf.forEachByte(0, len, processor)); ++ assertEquals(4, buf.forEachByte(4, len - 4, processor)); ++ assertEquals(-1, buf.forEachByte(5, len - 5, processor)); ++ ++ } ++ ++ @Test ++ public void testLineSplitProcessor() throws IOException { ++ String data = "abc\r\n\n"; ++ final ByteBuf buf = releaseLater( ++ Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); ++ ++ final int len = buf.readableBytes(); ++ LineSplitProcessor processor = new LineSplitProcessor(); ++ ++ //find CR ++ assertEquals(3, buf.forEachByte(0, len, processor)); ++ ++ // find CRLF ++ assertEquals(4, buf.forEachByte(4, len - 4, processor)); ++ assertEquals(buf.getByte(4), '\n'); ++ // need to skip LF ++ assertTrue(processor.isPrevCharCR()); ++ ++ // find LF ++ assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java index 0000000,0000000..70282d9 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java @@@ -1,0 -1,0 +1,101 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.json; ++ ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.tajo.catalog.CatalogUtil; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.catalog.proto.CatalogProtos; ++import org.apache.tajo.common.TajoDataTypes; ++import org.apache.tajo.conf.TajoConf; ++import org.apache.tajo.datum.Datum; ++import org.apache.tajo.datum.DatumFactory; ++import org.apache.tajo.datum.NullDatum; ++import org.apache.tajo.storage.Scanner; ++import org.apache.tajo.storage.StorageManager; ++import org.apache.tajo.storage.Tuple; ++import org.apache.tajo.storage.VTuple; ++import org.apache.tajo.storage.fragment.FileFragment; ++import org.junit.Test; ++ ++import java.io.IOException; ++import java.net.URL; ++ ++import static org.junit.Assert.*; ++ ++public class TestJsonSerDe { ++ private static Schema schema = new Schema(); ++ ++ static { ++ schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN); ++ schema.addColumn("col2", TajoDataTypes.Type.CHAR, 7); ++ schema.addColumn("col3", TajoDataTypes.Type.INT2); ++ schema.addColumn("col4", TajoDataTypes.Type.INT4); ++ schema.addColumn("col5", TajoDataTypes.Type.INT8); ++ schema.addColumn("col6", TajoDataTypes.Type.FLOAT4); ++ schema.addColumn("col7", TajoDataTypes.Type.FLOAT8); ++ schema.addColumn("col8", TajoDataTypes.Type.TEXT); ++ schema.addColumn("col9", TajoDataTypes.Type.BLOB); ++ schema.addColumn("col10", TajoDataTypes.Type.INET4); ++ schema.addColumn("col11", TajoDataTypes.Type.NULL_TYPE); ++ } ++ ++ public static Path getResourcePath(String path, String suffix) { ++ URL resultBaseURL = ClassLoader.getSystemResource(path); ++ return new Path(resultBaseURL.toString(), suffix); ++ } ++ ++ @Test ++ public void testVarioutType() throws IOException { ++ TajoConf conf = new TajoConf(); ++ ++ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); ++ Path tablePath = new Path(getResourcePath("dataset", "TestJsonSerDe"), "testVariousType.json"); ++ FileSystem fs = FileSystem.getLocal(conf); ++ FileStatus status = fs.getFileStatus(tablePath); ++ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); ++ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); ++ scanner.init(); ++ ++ Tuple tuple = scanner.next(); ++ assertNotNull(tuple); ++ assertNull(scanner.next()); ++ scanner.close(); ++ ++ Tuple baseTuple = new VTuple(11); ++ baseTuple.put(new Datum[] { ++ DatumFactory.createBool(true), // 0 ++ DatumFactory.createChar("hyunsik"), // 1 ++ DatumFactory.createInt2((short) 17), // 2 ++ DatumFactory.createInt4(59), // 3 ++ DatumFactory.createInt8(23l), // 4 ++ DatumFactory.createFloat4(77.9f), // 5 ++ DatumFactory.createFloat8(271.9d), // 6 ++ DatumFactory.createText("hyunsik"), // 7 ++ DatumFactory.createBlob("hyunsik".getBytes()), // 8 ++ DatumFactory.createInet4("192.168.0.1"), // 9 ++ NullDatum.get(), // 10 ++ }); ++ ++ assertEquals(baseTuple, tuple); ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json index 0000000,0000000..739dfe7 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json @@@ -1,0 -1,0 +1,6 @@@ ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json index 0000000,0000000..8256b72 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json @@@ -1,0 -1,0 +1,4 @@@ ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1" ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
