http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
index 0000000,0000000..c43ba38
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
@@@ -1,0 -1,0 +1,577 @@@
++/***
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.tuple.offheap;
++
++import com.google.common.collect.Lists;
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.tajo.catalog.*;
++import org.apache.tajo.common.TajoDataTypes;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.datum.ProtobufDatum;
++import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
++import org.apache.tajo.storage.BaseTupleComparator;
++import org.apache.tajo.storage.RowStoreUtil;
++import org.apache.tajo.storage.Tuple;
++import org.apache.tajo.storage.VTuple;
++import org.apache.tajo.unit.StorageUnit;
++import org.apache.tajo.util.FileUtil;
++import org.apache.tajo.util.ProtoUtil;
++import org.junit.Test;
++
++import java.nio.ByteBuffer;
++import java.util.Collections;
++import java.util.List;
++
++import static org.apache.tajo.common.TajoDataTypes.Type;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++public class TestOffHeapRowBlock {
++  private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class);
++  public static String UNICODE_FIELD_PREFIX = "abc_가나다_";
++  public static Schema schema;
++
++  static {
++    schema = new Schema();
++    schema.addColumn("col0", Type.BOOLEAN);
++    schema.addColumn("col1", Type.INT2);
++    schema.addColumn("col2", Type.INT4);
++    schema.addColumn("col3", Type.INT8);
++    schema.addColumn("col4", Type.FLOAT4);
++    schema.addColumn("col5", Type.FLOAT8);
++    schema.addColumn("col6", Type.TEXT);
++    schema.addColumn("col7", Type.TIMESTAMP);
++    schema.addColumn("col8", Type.DATE);
++    schema.addColumn("col9", Type.TIME);
++    schema.addColumn("col10", Type.INTERVAL);
++    schema.addColumn("col11", Type.INET4);
++    schema.addColumn("col12",
++        CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, 
PrimitiveProtos.StringProto.class.getName()));
++  }
++
++  private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long 
startTime, long endTime) {
++    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes 
allocated "
++        + (endTime - startTime) + " msec");
++  }
++
++  @Test
++  public void testPutAndReadValidation() {
++    int rowNum = 1000;
++
++    long allocStart = System.currentTimeMillis();
++    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
++    long allocEnd = System.currentTimeMillis();
++    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
++
++    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
++
++    ZeroCopyTuple tuple = new ZeroCopyTuple();
++    long writeStart = System.currentTimeMillis();
++    for (int i = 0; i < rowNum; i++) {
++      fillRow(i, rowBlock.getWriter());
++
++      reader.reset();
++      int j = 0;
++      while(reader.next(tuple)) {
++        validateTupleResult(j, tuple);
++
++        j++;
++      }
++    }
++    long writeEnd = System.currentTimeMillis();
++    LOG.info("writing and validating take " + (writeEnd - writeStart) + " 
msec");
++
++    long readStart = System.currentTimeMillis();
++    tuple = new ZeroCopyTuple();
++    int j = 0;
++    reader.reset();
++    while(reader.next(tuple)) {
++      validateTupleResult(j, tuple);
++      j++;
++    }
++    long readEnd = System.currentTimeMillis();
++    LOG.info("reading takes " + (readEnd - readStart) + " msec");
++
++    rowBlock.release();
++  }
++
++  @Test
++  public void testNullityValidation() {
++    int rowNum = 1000;
++
++    long allocStart = System.currentTimeMillis();
++    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
++    long allocEnd = System.currentTimeMillis();
++    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
++
++    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
++    ZeroCopyTuple tuple = new ZeroCopyTuple();
++    long writeStart = System.currentTimeMillis();
++    for (int i = 0; i < rowNum; i++) {
++      fillRowBlockWithNull(i, rowBlock.getWriter());
++
++      reader.reset();
++      int j = 0;
++      while(reader.next(tuple)) {
++        validateNullity(j, tuple);
++
++        j++;
++      }
++    }
++    long writeEnd = System.currentTimeMillis();
++    LOG.info("writing and nullity validating take " + (writeEnd - writeStart) 
+" msec");
++
++    long readStart = System.currentTimeMillis();
++    tuple = new ZeroCopyTuple();
++    int j = 0;
++    reader.reset();
++    while(reader.next(tuple)) {
++      validateNullity(j, tuple);
++
++      j++;
++    }
++    long readEnd = System.currentTimeMillis();
++    LOG.info("reading takes " + (readEnd - readStart) + " msec");
++
++    rowBlock.release();
++  }
++
++  @Test
++  public void testEmptyRow() {
++    int rowNum = 1000;
++
++    long allocStart = System.currentTimeMillis();
++    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 
10);
++    long allocEnd = System.currentTimeMillis();
++    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
++
++    long writeStart = System.currentTimeMillis();
++    for (int i = 0; i < rowNum; i++) {
++      rowBlock.getWriter().startRow();
++      // empty columns
++      rowBlock.getWriter().endRow();
++    }
++    long writeEnd = System.currentTimeMillis();
++    LOG.info("writing tooks " + (writeEnd - writeStart) + " msec");
++
++    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
++
++    long readStart = System.currentTimeMillis();
++    ZeroCopyTuple tuple = new ZeroCopyTuple();
++    int j = 0;
++    reader.reset();
++    while(reader.next(tuple)) {
++      j++;
++    }
++    long readEnd = System.currentTimeMillis();
++    LOG.info("reading takes " + (readEnd - readStart) + " msec");
++    rowBlock.release();
++
++    assertEquals(rowNum, j);
++    assertEquals(rowNum, rowBlock.rows());
++  }
++
++  @Test
++  public void testSortBenchmark() {
++    int rowNum = 1000;
++
++    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
++    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
++
++    List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList();
++
++    long readStart = System.currentTimeMillis();
++    ZeroCopyTuple tuple = new ZeroCopyTuple();
++    reader.reset();
++    while(reader.next(tuple)) {
++      unSafeTuples.add(tuple);
++      tuple = new ZeroCopyTuple();
++    }
++    long readEnd = System.currentTimeMillis();
++    LOG.info("reading takes " + (readEnd - readStart) + " msec");
++
++    SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4));
++    BaseTupleComparator comparator = new BaseTupleComparator(schema, new 
SortSpec[] {sortSpec});
++
++    long sortStart = System.currentTimeMillis();
++    Collections.sort(unSafeTuples, comparator);
++    long sortEnd = System.currentTimeMillis();
++    LOG.info("sorting took " + (sortEnd - sortStart) + " msec");
++    rowBlock.release();
++  }
++
++  @Test
++  public void testVTuplePutAndGetBenchmark() {
++    int rowNum = 1000;
++
++    List<VTuple> rowBlock = Lists.newArrayList();
++    long writeStart = System.currentTimeMillis();
++    VTuple tuple;
++    for (int i = 0; i < rowNum; i++) {
++      tuple = new VTuple(schema.size());
++      fillVTuple(i, tuple);
++      rowBlock.add(tuple);
++    }
++    long writeEnd = System.currentTimeMillis();
++    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
++
++    long readStart = System.currentTimeMillis();
++    int j = 0;
++    for (VTuple t : rowBlock) {
++      validateTupleResult(j, t);
++      j++;
++    }
++    long readEnd = System.currentTimeMillis();
++    LOG.info("reading takes " + (readEnd - readStart) + " msec");
++
++    int count = 0;
++    for (int l = 0; l < rowBlock.size(); l++) {
++      for(int m = 0; m < schema.size(); m++ ) {
++        if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == 
Type.INT4) {
++          count ++;
++        }
++      }
++    }
++    // For preventing unnecessary code elimination optimization.
++    LOG.info("The number of INT4 values is " + count + ".");
++  }
++
++  @Test
++  public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() {
++    int rowNum = 1000;
++
++    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 
100);
++
++    long writeStart = System.currentTimeMillis();
++    VTuple tuple = new VTuple(schema.size());
++    for (int i = 0; i < rowNum; i++) {
++      fillVTuple(i, tuple);
++
++      RowStoreUtil.convert(tuple, rowBlock.getWriter());
++    }
++    long writeEnd = System.currentTimeMillis();
++    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
++
++    validateResults(rowBlock);
++    rowBlock.release();
++  }
++
++  @Test
++  public void testSerDerOfRowBlock() {
++    int rowNum = 1000;
++
++    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
++
++    ByteBuffer bb = rowBlock.nioBuffer();
++    OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
++    validateResults(restoredRowBlock);
++    rowBlock.release();
++  }
++
++  @Test
++  public void testSerDerOfZeroCopyTuple() {
++    int rowNum = 1000;
++
++    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
++
++    ByteBuffer bb = rowBlock.nioBuffer();
++    OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
++    OffHeapRowBlockReader reader = new 
OffHeapRowBlockReader(restoredRowBlock);
++
++    long readStart = System.currentTimeMillis();
++    ZeroCopyTuple tuple = new ZeroCopyTuple();
++    ZeroCopyTuple copyTuple = new ZeroCopyTuple();
++    int j = 0;
++    reader.reset();
++    while(reader.next(tuple)) {
++      ByteBuffer copy = tuple.nioBuffer();
++      copyTuple.set(copy, SchemaUtil.toDataTypes(schema));
++
++      validateTupleResult(j, copyTuple);
++
++      j++;
++    }
++    long readEnd = System.currentTimeMillis();
++    LOG.info("reading takes " + (readEnd - readStart) + " msec");
++
++    rowBlock.release();
++  }
++
++  public static OffHeapRowBlock createRowBlock(int rowNum) {
++    long allocateStart = System.currentTimeMillis();
++    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 
8);
++    long allocatedEnd = System.currentTimeMillis();
++    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes 
allocated "
++        + (allocatedEnd - allocateStart) + " msec");
++
++    long writeStart = System.currentTimeMillis();
++    for (int i = 0; i < rowNum; i++) {
++      fillRow(i, rowBlock.getWriter());
++    }
++    long writeEnd = System.currentTimeMillis();
++    LOG.info("writing takes " + (writeEnd - writeStart) + " msec");
++
++    return rowBlock;
++  }
++
++  public static OffHeapRowBlock createRowBlockWithNull(int rowNum) {
++    long allocateStart = System.currentTimeMillis();
++    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 
8);
++    long allocatedEnd = System.currentTimeMillis();
++    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes 
allocated "
++        + (allocatedEnd - allocateStart) + " msec");
++
++    long writeStart = System.currentTimeMillis();
++    for (int i = 0; i < rowNum; i++) {
++      fillRowBlockWithNull(i, rowBlock.getWriter());
++    }
++    long writeEnd = System.currentTimeMillis();
++    LOG.info("writing and validating take " + (writeEnd - writeStart) + " 
msec");
++
++    return rowBlock;
++  }
++
++  public static void fillRow(int i, RowWriter builder) {
++    builder.startRow();
++    builder.putBool(i % 1 == 0 ? true : false); // 0
++    builder.putInt2((short) 1);                 // 1
++    builder.putInt4(i);                         // 2
++    builder.putInt8(i);                         // 3
++    builder.putFloat4(i);                       // 4
++    builder.putFloat8(i);                       // 5
++    builder.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
++    builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 
08:48:00").asInt8() + i); // 7
++    builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
++    builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
++    builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 
10
++    builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); 
// 11
++    builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + 
""))); // 12
++    builder.endRow();
++  }
++
++  public static void fillRowBlockWithNull(int i, RowWriter writer) {
++    writer.startRow();
++
++    if (i == 0) {
++      writer.skipField();
++    } else {
++      writer.putBool(i % 1 == 0 ? true : false); // 0
++    }
++    if (i % 1 == 0) {
++      writer.skipField();
++    } else {
++      writer.putInt2((short) 1);                 // 1
++    }
++
++    if (i % 2 == 0) {
++      writer.skipField();
++    } else {
++      writer.putInt4(i);                         // 2
++    }
++
++    if (i % 3 == 0) {
++      writer.skipField();
++    } else {
++      writer.putInt8(i);                         // 3
++    }
++
++    if (i % 4 == 0) {
++      writer.skipField();
++    } else {
++      writer.putFloat4(i);                       // 4
++    }
++
++    if (i % 5 == 0) {
++      writer.skipField();
++    } else {
++      writer.putFloat8(i);                       // 5
++    }
++
++    if (i % 6 == 0) {
++      writer.skipField();
++    } else {
++      writer.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
++    }
++
++    if (i % 7 == 0) {
++      writer.skipField();
++    } else {
++      writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 
08:48:00").asInt8() + i); // 7
++    }
++
++    if (i % 8 == 0) {
++      writer.skipField();
++    } else {
++      writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
++    }
++
++    if (i % 9 == 0) {
++      writer.skipField();
++    } else {
++      writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
++    }
++
++    if (i % 10 == 0) {
++      writer.skipField();
++    } else {
++      writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 
10
++    }
++
++    if (i % 11 == 0) {
++      writer.skipField();
++    } else {
++      writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); 
// 11
++    }
++
++    if (i % 12 == 0) {
++      writer.skipField();
++    } else {
++      writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + 
""))); // 12
++    }
++
++    writer.endRow();
++  }
++
++  public static void fillVTuple(int i, VTuple tuple) {
++    tuple.put(0, DatumFactory.createBool(i % 1 == 0));
++    tuple.put(1, DatumFactory.createInt2((short) 1));
++    tuple.put(2, DatumFactory.createInt4(i));
++    tuple.put(3, DatumFactory.createInt8(i));
++    tuple.put(4, DatumFactory.createFloat4(i));
++    tuple.put(5, DatumFactory.createFloat8(i));
++    tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + 
i).getBytes()));
++    tuple.put(7, 
DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 
08:48:00").asInt8() + i)); // 7
++    tuple.put(8, 
DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); 
// 8
++    tuple.put(9, 
DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9
++    tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10
++    tuple.put(11, 
DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + 
i)); // 11
++    tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12;
++  }
++
++  public static void validateResults(OffHeapRowBlock rowBlock) {
++    long readStart = System.currentTimeMillis();
++    ZeroCopyTuple tuple = new ZeroCopyTuple();
++    int j = 0;
++    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
++    reader.reset();
++    while(reader.next(tuple)) {
++      validateTupleResult(j, tuple);
++      j++;
++    }
++    long readEnd = System.currentTimeMillis();
++    LOG.info("Reading takes " + (readEnd - readStart) + " msec");
++  }
++
++  public static void validateTupleResult(int j, Tuple t) {
++    assertTrue((j % 1 == 0) == t.getBool(0));
++    assertTrue(1 == t.getInt2(1));
++    assertEquals(j, t.getInt4(2));
++    assertEquals(j, t.getInt8(3));
++    assertTrue(j == t.getFloat4(4));
++    assertTrue(j == t.getFloat8(5));
++    assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6));
++    assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() 
+ (long) j, t.getInt8(7));
++    assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, 
t.getInt4(8));
++    assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, 
t.getInt8(9));
++    assertEquals(DatumFactory.createInterval((j + 1) + " hours"), 
t.getInterval(10));
++    assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, 
t.getInt4(11));
++    assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), 
t.getProtobufDatum(12));
++  }
++
++  public static void validateNullity(int j, Tuple tuple) {
++    if (j == 0) {
++      tuple.isNull(0);
++    } else {
++      assertTrue((j % 1 == 0) == tuple.getBool(0));
++    }
++
++    if (j % 1 == 0) {
++      tuple.isNull(1);
++    } else {
++      assertTrue(1 == tuple.getInt2(1));
++    }
++
++    if (j % 2 == 0) {
++      tuple.isNull(2);
++    } else {
++      assertEquals(j, tuple.getInt4(2));
++    }
++
++    if (j % 3 == 0) {
++      tuple.isNull(3);
++    } else {
++      assertEquals(j, tuple.getInt8(3));
++    }
++
++    if (j % 4 == 0) {
++      tuple.isNull(4);
++    } else {
++      assertTrue(j == tuple.getFloat4(4));
++    }
++
++    if (j % 5 == 0) {
++      tuple.isNull(5);
++    } else {
++      assertTrue(j == tuple.getFloat8(5));
++    }
++
++    if (j % 6 == 0) {
++      tuple.isNull(6);
++    } else {
++      assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6));
++    }
++
++    if (j % 7 == 0) {
++      tuple.isNull(7);
++    } else {
++      assertEquals(DatumFactory.createTimestamp("2014-04-16 
08:48:00").asInt8() + (long) j, tuple.getInt8(7));
++    }
++
++    if (j % 8 == 0) {
++      tuple.isNull(8);
++    } else {
++      assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, 
tuple.getInt4(8));
++    }
++
++    if (j % 9 == 0) {
++      tuple.isNull(9);
++    } else {
++      assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, 
tuple.getInt8(9));
++    }
++
++    if (j % 10 == 0) {
++      tuple.isNull(10);
++    } else {
++      assertEquals(DatumFactory.createInterval((j + 1) + " hours"), 
tuple.getInterval(10));
++    }
++
++    if (j % 11 == 0) {
++      tuple.isNull(11);
++    } else {
++      assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, 
tuple.getInt4(11));
++    }
++
++    if (j % 12 == 0) {
++      tuple.isNull(12);
++    } else {
++      assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), 
tuple.getProtobufDatum(12));
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
index 0000000,0000000..1eb9c17
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
@@@ -1,0 -1,0 +1,59 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.tuple.offheap;
++
++import org.apache.tajo.unit.StorageUnit;
++import org.junit.Test;
++
++import static org.junit.Assert.*;
++
++public class TestResizableSpec {
++
++  @Test
++  public void testResizableLimit() {
++    ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 
1000 * StorageUnit.MB, 0.1f, 1.0f);
++
++    long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * 
StorageUnit.MB * 0.1f));
++
++    assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * 
StorageUnit.MB * 0.1f));
++
++    assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB));
++
++    assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB));
++
++    assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1);
++
++    assertFalse(limit.canIncrease(limit.limit()));
++  }
++
++  @Test
++  public void testFixedLimit() {
++    FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 
0.0f);
++
++    assertEquals(limit.limit(), 100 * StorageUnit.MB);
++
++    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000));
++
++    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * 
StorageUnit.MB));
++
++    assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB));
++
++    assertFalse(limit.canIncrease(limit.limit()));
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
index 0000000,0000000..0b3755d
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
@@@ -1,0 -1,0 +1,37 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.tajo.catalog.Column;
++import org.apache.tajo.datum.Datum;
++import org.apache.tajo.storage.text.TextLineParsingError;
++
++import java.io.IOException;
++import java.io.OutputStream;
++
++
++public interface FieldSerializerDeserializer {
++
++  public int serialize(OutputStream out, Datum datum, Column col, int 
columnIndex, byte[] nullChars) throws IOException;
++
++  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf 
nullChars)
++      throws IOException, TextLineParsingError;
++
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index 85f91cc,0000000..dbb8bd0
mode 100644,000000..100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@@ -1,219 -1,0 +1,220 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage.avro;
 +
 +import org.apache.avro.Schema;
 +import org.apache.avro.file.DataFileWriter;
 +import org.apache.avro.generic.GenericData;
 +import org.apache.avro.generic.GenericDatumWriter;
 +import org.apache.avro.generic.GenericRecord;
 +import org.apache.avro.io.DatumWriter;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.tajo.QueryUnitAttemptId;
 +import org.apache.tajo.catalog.Column;
 +import org.apache.tajo.catalog.TableMeta;
 +import org.apache.tajo.catalog.statistics.TableStats;
 +import org.apache.tajo.datum.NullDatum;
 +import org.apache.tajo.storage.FileAppender;
 +import org.apache.tajo.storage.TableStatistics;
 +import org.apache.tajo.storage.Tuple;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.List;
 +
 +/**
 + * FileAppender for writing to Avro files.
 + */
 +public class AvroAppender extends FileAppender {
 +  private TableStatistics stats;
 +  private Schema avroSchema;
 +  private List<Schema.Field> avroFields;
 +  private DataFileWriter<GenericRecord> dataFileWriter;
 +
 +  /**
 +   * Creates a new AvroAppender.
 +   *
 +   * @param conf Configuration properties.
++   * @param taskAttemptId The task attempt id
 +   * @param schema The table schema.
 +   * @param meta The table metadata.
 +   * @param workDir The path of the Parquet file to write to.
 +   */
 +  public AvroAppender(Configuration conf,
 +                      QueryUnitAttemptId taskAttemptId,
 +                      org.apache.tajo.catalog.Schema schema,
 +                      TableMeta meta, Path workDir) throws IOException {
 +    super(conf, taskAttemptId, schema, meta, workDir);
 +  }
 +
 +  /**
 +   * Initializes the Appender.
 +   */
 +  public void init() throws IOException {
 +    FileSystem fs = path.getFileSystem(conf);
 +    if (!fs.exists(path.getParent())) {
 +      throw new FileNotFoundException(path.toString());
 +    }
 +    FSDataOutputStream outputStream = fs.create(path);
 +
 +    avroSchema = AvroUtil.getAvroSchema(meta, conf);
 +    avroFields = avroSchema.getFields();
 +
 +    DatumWriter<GenericRecord> datumWriter =
 +        new GenericDatumWriter<GenericRecord>(avroSchema);
 +    dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
 +    dataFileWriter.create(avroSchema, outputStream);
 +
 +    if (enabledStats) {
 +      this.stats = new TableStatistics(schema);
 +    }
 +    super.init();
 +  }
 +
 +  /**
 +   * Gets the current offset. Tracking offsets is currenly not implemented, so
 +   * this method always returns 0.
 +   *
 +   * @return 0
 +   */
 +  @Override
 +  public long getOffset() throws IOException {
 +    return 0;
 +  }
 +
 +  private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
 +    if (tuple.get(i) instanceof NullDatum) {
 +      return null;
 +    }
 +    switch (avroType) {
 +      case NULL:
 +        return null;
 +      case BOOLEAN:
 +        return tuple.getBool(i);
 +      case INT:
 +        return tuple.getInt4(i);
 +      case LONG:
 +        return tuple.getInt8(i);
 +      case FLOAT:
 +        return tuple.getFloat4(i);
 +      case DOUBLE:
 +        return tuple.getFloat8(i);
 +      case BYTES:
 +      case FIXED:
 +        return ByteBuffer.wrap(tuple.getBytes(i));
 +      case STRING:
 +        return tuple.getText(i);
 +      default:
 +        throw new RuntimeException("Unknown primitive type.");
 +    }
 +  }
 +
 +  /**
 +   * Write a Tuple to the Avro file.
 +   *
 +   * @param tuple The Tuple to write.
 +   */
 +  @Override
 +  public void addTuple(Tuple tuple) throws IOException {
 +    GenericRecord record = new GenericData.Record(avroSchema);
 +    for (int i = 0; i < schema.size(); ++i) {
 +      Column column = schema.getColumn(i);
 +      if (enabledStats) {
 +        stats.analyzeField(i, tuple.get(i));
 +      }
 +      Object value;
 +      Schema.Field avroField = avroFields.get(i);
 +      Schema.Type avroType = avroField.schema().getType();
 +      switch (avroType) {
 +        case NULL:
 +        case BOOLEAN:
 +        case INT:
 +        case LONG:
 +        case FLOAT:
 +        case DOUBLE:
 +        case BYTES:
 +        case STRING:
 +        case FIXED:
 +          value = getPrimitive(tuple, i, avroType);
 +          break;
 +        case RECORD:
 +          throw new RuntimeException("Avro RECORD not supported.");
 +        case ENUM:
 +          throw new RuntimeException("Avro ENUM not supported.");
 +        case MAP:
 +          throw new RuntimeException("Avro MAP not supported.");
 +        case UNION:
 +          List<Schema> schemas = avroField.schema().getTypes();
 +          if (schemas.size() != 2) {
 +            throw new RuntimeException("Avro UNION not supported.");
 +          }
 +          if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
 +            value = getPrimitive(tuple, i, schemas.get(1).getType());
 +          } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
 +            value = getPrimitive(tuple, i, schemas.get(0).getType());
 +          } else {
 +            throw new RuntimeException("Avro UNION not supported.");
 +          }
 +          break;
 +        default:
 +          throw new RuntimeException("Unknown type: " + avroType);
 +      }
 +      record.put(i, value);
 +    }
 +    dataFileWriter.append(record);
 +
 +    if (enabledStats) {
 +      stats.incrementRow();
 +    }
 +  }
 +
 +  /**
 +   * Flushes the current state of the file.
 +   */
 +  @Override
 +  public void flush() throws IOException {
 +    dataFileWriter.flush();
 +  }
 +
 +  /**
 +   * Closes the Appender.
 +   */
 +  @Override
 +  public void close() throws IOException {
 +    dataFileWriter.close();
 +  }
 +
 +  /**
 +   * If table statistics is enabled, retrieve the table statistics.
 +   *
 +   * @return Table statistics if enabled or null otherwise.
 +   */
 +  @Override
 +  public TableStats getStats() {
 +    if (enabledStats) {
 +      return stats.getTableStat();
 +    } else {
 +      return null;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
index c1f63a8,0000000..dfe36f6
mode 100644,000000..100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
@@@ -1,220 -1,0 +1,225 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage.json;
 +
 +
 +import io.netty.buffer.ByteBuf;
 +import net.minidev.json.JSONArray;
 +import net.minidev.json.JSONObject;
 +import net.minidev.json.parser.JSONParser;
++import net.minidev.json.parser.ParseException;
 +import org.apache.tajo.catalog.Schema;
 +import org.apache.tajo.catalog.SchemaUtil;
 +import org.apache.tajo.catalog.TableMeta;
 +import org.apache.tajo.common.TajoDataTypes.Type;
 +import org.apache.tajo.common.exception.NotImplementedException;
 +import org.apache.tajo.datum.DatumFactory;
 +import org.apache.tajo.datum.NullDatum;
++import org.apache.tajo.datum.TextDatum;
++import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
 +import org.apache.tajo.storage.Tuple;
 +import org.apache.tajo.storage.text.TextLineDeserializer;
 +import org.apache.tajo.storage.text.TextLineParsingError;
 +
 +import java.io.IOException;
 +import java.util.Iterator;
 +
 +public class JsonLineDeserializer extends TextLineDeserializer {
 +  private JSONParser parser;
 +  private Type [] types;
 +  private String [] columnNames;
 +
 +  public JsonLineDeserializer(Schema schema, TableMeta meta, int[] 
targetColumnIndexes) {
 +    super(schema, meta, targetColumnIndexes);
 +  }
 +
 +  @Override
 +  public void init() {
 +    types = SchemaUtil.toTypes(schema);
 +    columnNames = SchemaUtil.toSimpleNames(schema);
 +
 +    parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE);
 +  }
 +
 +  @Override
 +  public void deserialize(ByteBuf buf, Tuple output) throws IOException, 
TextLineParsingError {
 +    byte [] line = new byte[buf.readableBytes()];
 +    buf.readBytes(line);
 +
 +    try {
 +      JSONObject object = (JSONObject) parser.parse(line);
 +
 +      for (int i = 0; i < targetColumnIndexes.length; i++) {
 +        int actualIdx = targetColumnIndexes[i];
 +        String fieldName = columnNames[actualIdx];
 +
 +        if (!object.containsKey(fieldName)) {
 +          output.put(actualIdx, NullDatum.get());
 +          continue;
 +        }
 +
 +        switch (types[actualIdx]) {
 +        case BOOLEAN:
 +          String boolStr = object.getAsString(fieldName);
 +          if (boolStr != null) {
 +            output.put(actualIdx, 
DatumFactory.createBool(boolStr.equals("true")));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case CHAR:
 +          String charStr = object.getAsString(fieldName);
 +          if (charStr != null) {
 +            output.put(actualIdx, DatumFactory.createChar(charStr));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case INT1:
 +        case INT2:
 +          Number int2Num = object.getAsNumber(fieldName);
 +          if (int2Num != null) {
 +            output.put(actualIdx, 
DatumFactory.createInt2(int2Num.shortValue()));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case INT4:
 +          Number int4Num = object.getAsNumber(fieldName);
 +          if (int4Num != null) {
 +            output.put(actualIdx, 
DatumFactory.createInt4(int4Num.intValue()));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case INT8:
 +          Number int8Num = object.getAsNumber(fieldName);
 +          if (int8Num != null) {
 +            output.put(actualIdx, 
DatumFactory.createInt8(int8Num.longValue()));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case FLOAT4:
 +          Number float4Num = object.getAsNumber(fieldName);
 +          if (float4Num != null) {
 +            output.put(actualIdx, 
DatumFactory.createFloat4(float4Num.floatValue()));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case FLOAT8:
 +          Number float8Num = object.getAsNumber(fieldName);
 +          if (float8Num != null) {
 +            output.put(actualIdx, 
DatumFactory.createFloat8(float8Num.doubleValue()));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case TEXT:
 +          String textStr = object.getAsString(fieldName);
 +          if (textStr != null) {
 +            output.put(actualIdx, DatumFactory.createText(textStr));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case TIMESTAMP:
 +          String timestampStr = object.getAsString(fieldName);
 +          if (timestampStr != null) {
 +            output.put(actualIdx, DatumFactory.createTimestamp(timestampStr));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case TIME:
 +          String timeStr = object.getAsString(fieldName);
 +          if (timeStr != null) {
 +            output.put(actualIdx, DatumFactory.createTime(timeStr));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case DATE:
 +          String dateStr = object.getAsString(fieldName);
 +          if (dateStr != null) {
 +            output.put(actualIdx, DatumFactory.createDate(dateStr));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +        case BIT:
 +        case BINARY:
 +        case VARBINARY:
 +        case BLOB: {
 +          Object jsonObject = object.get(fieldName);
 +
 +          if (jsonObject == null) {
 +            output.put(actualIdx, NullDatum.get());
 +            break;
-           } if (jsonObject instanceof String) {
-             output.put(actualIdx, 
DatumFactory.createBlob((String)jsonObject));
++          }
++          if (jsonObject instanceof String) {
++            output.put(actualIdx, DatumFactory.createBlob((String) 
jsonObject));
 +          } else if (jsonObject instanceof JSONArray) {
 +            JSONArray jsonArray = (JSONArray) jsonObject;
 +            byte[] bytes = new byte[jsonArray.size()];
 +            Iterator<Object> it = jsonArray.iterator();
 +            int arrayIdx = 0;
 +            while (it.hasNext()) {
 +              bytes[arrayIdx++] = ((Long) it.next()).byteValue();
 +            }
 +            if (bytes.length > 0) {
 +              output.put(actualIdx, DatumFactory.createBlob(bytes));
 +            } else {
 +              output.put(actualIdx, NullDatum.get());
 +            }
 +            break;
 +          } else {
 +            throw new IOException("Unknown json object: " + 
object.getClass().getSimpleName());
 +          }
 +          break;
 +        }
 +        case INET4:
 +          String inetStr = object.getAsString(fieldName);
 +          if (inetStr != null) {
 +            output.put(actualIdx, DatumFactory.createInet4(inetStr));
 +          } else {
 +            output.put(actualIdx, NullDatum.get());
 +          }
 +          break;
 +
 +        case NULL_TYPE:
 +          output.put(actualIdx, NullDatum.get());
 +          break;
 +
 +        default:
 +          throw new NotImplementedException(types[actualIdx].name() + " is 
not supported.");
 +        }
 +      }
- 
++    } catch (ParseException pe) {
++      throw new TextLineParsingError(new String(line, 
TextDatum.DEFAULT_CHARSET), pe);
 +    } catch (Throwable e) {
 +      throw new IOException(e);
 +    }
 +  }
 +
 +  @Override
 +  public void release() {
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 7848198,0000000..8824e3e
mode 100644,000000..100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@@ -1,475 -1,0 +1,481 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage.text;
 +
 +import io.netty.buffer.ByteBuf;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.IOUtils;
 +import org.apache.hadoop.io.compress.CompressionCodec;
 +import org.apache.hadoop.io.compress.CompressionCodecFactory;
 +import org.apache.hadoop.io.compress.CompressionOutputStream;
 +import org.apache.hadoop.io.compress.Compressor;
 +import org.apache.tajo.QueryUnitAttemptId;
 +import org.apache.tajo.catalog.Schema;
 +import org.apache.tajo.catalog.TableMeta;
 +import org.apache.tajo.catalog.statistics.TableStats;
 +import org.apache.tajo.storage.*;
 +import org.apache.tajo.storage.compress.CodecPool;
 +import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 +import org.apache.tajo.storage.fragment.FileFragment;
 +import org.apache.tajo.storage.fragment.Fragment;
 +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
 +import org.apache.tajo.util.ReflectionUtil;
 +
 +import java.io.BufferedOutputStream;
 +import java.io.DataOutputStream;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.Map;
 +import java.util.concurrent.ConcurrentHashMap;
 +
 +import static 
org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM;
 +import static 
org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM;
 +
 +public class DelimitedTextFile {
 +
 +  public static final byte LF = '\n';
 +
 +  private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
 +
 +  /** it caches line serde classes. */
 +  private static final Map<String, Class<? extends TextLineSerDe>> 
serdeClassCache =
 +      new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>();
 +
 +  /**
 +   * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table 
property 'text.serde.class' is given,
 +   * it will use the specified serder class.
 +   *
 +   * @return TextLineSerder
 +   */
 +  public static TextLineSerDe getLineSerde(TableMeta meta) {
 +    TextLineSerDe lineSerder;
 +
 +    String serDeClassName;
 +
 +    // if there is no given serde class, it will use CSV line serder.
 +    serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, 
StorageConstants.DEFAULT_TEXT_SERDE_CLASS);
 +
 +    try {
 +      Class<? extends TextLineSerDe> serdeClass;
 +
 +      if (serdeClassCache.containsKey(serDeClassName)) {
 +        serdeClass = serdeClassCache.get(serDeClassName);
 +      } else {
 +        serdeClass = (Class<? extends TextLineSerDe>) 
Class.forName(serDeClassName);
 +        serdeClassCache.put(serDeClassName, serdeClass);
 +      }
 +      lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass);
 +    } catch (Throwable e) {
 +      throw new RuntimeException("TextLineSerde class cannot be 
initialized.", e);
 +    }
 +
 +    return lineSerder;
 +  }
 +
 +  public static class DelimitedTextFileAppender extends FileAppender {
 +    private final TableMeta meta;
 +    private final Schema schema;
 +    private final FileSystem fs;
 +    private FSDataOutputStream fos;
 +    private DataOutputStream outputStream;
 +    private CompressionOutputStream deflateFilter;
 +    private TableStatistics stats = null;
 +    private Compressor compressor;
 +    private CompressionCodecFactory codecFactory;
 +    private CompressionCodec codec;
 +    private Path compressedPath;
 +    private byte[] nullChars;
 +    private int BUFFER_SIZE = 128 * 1024;
 +    private int bufferedBytes = 0;
 +    private long pos = 0;
 +
 +    private NonSyncByteArrayOutputStream os;
 +    private TextLineSerializer serializer;
 +
 +    public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId 
taskAttemptId,
 +                                     final Schema schema, final TableMeta 
meta, final Path path)
 +        throws IOException {
 +      super(conf, taskAttemptId, schema, meta, path);
 +      this.fs = path.getFileSystem(conf);
 +      this.meta = meta;
 +      this.schema = schema;
 +    }
 +
 +    public TextLineSerDe getLineSerde() {
 +      return DelimitedTextFile.getLineSerde(meta);
 +    }
 +
 +    @Override
 +    public void init() throws IOException {
 +      if (!fs.exists(path.getParent())) {
 +        throw new FileNotFoundException(path.toString());
 +      }
 +
 +      if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
 +        String codecName = 
this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
 +        codecFactory = new CompressionCodecFactory(conf);
 +        codec = codecFactory.getCodecByClassName(codecName);
 +        compressor = CodecPool.getCompressor(codec);
 +        if (compressor != null) compressor.reset();  //builtin gzip is null
 +
 +        String extension = codec.getDefaultExtension();
 +        compressedPath = path.suffix(extension);
 +
 +        if (fs.exists(compressedPath)) {
 +          throw new AlreadyExistsStorageException(compressedPath);
 +        }
 +
 +        fos = fs.create(compressedPath);
 +        deflateFilter = codec.createOutputStream(fos, compressor);
 +        outputStream = new DataOutputStream(deflateFilter);
 +
 +      } else {
 +        if (fs.exists(path)) {
 +          throw new AlreadyExistsStorageException(path);
 +        }
 +        fos = fs.create(path);
 +        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
 +      }
 +
 +      if (enabledStats) {
 +        this.stats = new TableStatistics(this.schema);
 +      }
 +
 +      serializer = getLineSerde().createSerializer(schema, meta);
 +      serializer.init();
 +
 +      if (os == null) {
 +        os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
 +      }
 +
 +      os.reset();
 +      pos = fos.getPos();
 +      bufferedBytes = 0;
 +      super.init();
 +    }
 +
 +    @Override
 +    public void addTuple(Tuple tuple) throws IOException {
 +      // write
 +      int rowBytes = serializer.serialize(os, tuple);
 +
 +      // new line
 +      os.write(LF);
 +      rowBytes += 1;
 +
 +      // update positions
 +      pos += rowBytes;
 +      bufferedBytes += rowBytes;
 +
 +      // refill buffer if necessary
 +      if (bufferedBytes > BUFFER_SIZE) {
 +        flushBuffer();
 +      }
 +      // Statistical section
 +      if (enabledStats) {
 +        stats.incrementRow();
 +      }
 +    }
 +
 +    private void flushBuffer() throws IOException {
 +      if (os.getLength() > 0) {
 +        os.writeTo(outputStream);
 +        os.reset();
 +        bufferedBytes = 0;
 +      }
 +    }
 +
 +    @Override
 +    public long getOffset() throws IOException {
 +      return pos;
 +    }
 +
 +    @Override
 +    public void flush() throws IOException {
 +      flushBuffer();
 +      outputStream.flush();
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +
 +      try {
 +        serializer.release();
 +
 +        if(outputStream != null){
 +          flush();
 +        }
 +
 +        // Statistical section
 +        if (enabledStats) {
 +          stats.setNumBytes(getOffset());
 +        }
 +
 +        if (deflateFilter != null) {
 +          deflateFilter.finish();
 +          deflateFilter.resetState();
 +          deflateFilter = null;
 +        }
 +
 +        os.close();
 +      } finally {
 +        IOUtils.cleanup(LOG, fos);
 +        if (compressor != null) {
 +          CodecPool.returnCompressor(compressor);
 +          compressor = null;
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public TableStats getStats() {
 +      if (enabledStats) {
 +        return stats.getTableStat();
 +      } else {
 +        return null;
 +      }
 +    }
 +
 +    public boolean isCompress() {
 +      return compressor != null;
 +    }
 +
 +    public String getExtension() {
 +      return codec != null ? codec.getDefaultExtension() : "";
 +    }
 +  }
 +
 +  public static class DelimitedTextFileScanner extends FileScanner {
 +    private boolean splittable = false;
 +    private final long startOffset;
 +
 +    private final long endOffset;
 +    /** The number of actual read records */
 +    private int recordCount = 0;
 +    private int[] targetColumnIndexes;
 +
 +    private DelimitedLineReader reader;
 +    private TextLineDeserializer deserializer;
 +
 +    private int errorPrintOutMaxNum = 5;
 +    /** Maximum number of permissible errors */
 +    private int errorTorrenceMaxNum;
 +    /** How many errors have occurred? */
 +    private int errorNum;
 +
 +    public DelimitedTextFileScanner(Configuration conf, final Schema schema, 
final TableMeta meta,
 +                                    final Fragment fragment)
 +        throws IOException {
 +      super(conf, schema, meta, fragment);
 +      reader = new DelimitedLineReader(conf, this.fragment);
 +      if (!reader.isCompressed()) {
 +        splittable = true;
 +      }
 +
 +      startOffset = this.fragment.getStartKey();
 +      endOffset = startOffset + fragment.getLength();
 +
 +      errorTorrenceMaxNum =
 +          Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, 
DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
 +    }
 +
 +
 +    @Override
 +    public void init() throws IOException {
 +      if (reader != null) {
 +        reader.close();
 +      }
++
 +      reader = new DelimitedLineReader(conf, fragment);
 +      reader.init();
 +      recordCount = 0;
 +
 +      if (targets == null) {
 +        targets = schema.toArray();
 +      }
 +
 +      targetColumnIndexes = new int[targets.length];
 +      for (int i = 0; i < targets.length; i++) {
 +        targetColumnIndexes[i] = 
schema.getColumnId(targets[i].getQualifiedName());
 +      }
 +
 +      super.init();
 +      Arrays.sort(targetColumnIndexes);
 +      if (LOG.isDebugEnabled()) {
 +        LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," 
+ startOffset + "," + endOffset);
 +      }
 +
 +      if (startOffset > 0) {
 +        reader.readLine();  // skip first line;
 +      }
 +
 +      deserializer = getLineSerde().createDeserializer(schema, meta, 
targetColumnIndexes);
 +      deserializer.init();
 +    }
 +
 +    public TextLineSerDe getLineSerde() {
 +      return DelimitedTextFile.getLineSerde(meta);
 +    }
 +
 +    @Override
 +    public float getProgress() {
 +      try {
 +        if (!reader.isReadable()) {
 +          return 1.0f;
 +        }
 +        long filePos = reader.getCompressedPosition();
 +        if (startOffset == filePos) {
 +          return 0.0f;
 +        } else {
 +          long readBytes = filePos - startOffset;
 +          long remainingBytes = Math.max(endOffset - filePos, 0);
 +          return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + 
remainingBytes));
 +        }
 +      } catch (IOException e) {
 +        LOG.error(e.getMessage(), e);
 +        return 0.0f;
 +      }
 +    }
 +
 +    @Override
 +    public Tuple next() throws IOException {
++      VTuple tuple;
 +
 +      if (!reader.isReadable()) {
 +        return null;
 +      }
 +
-       if (targets.length == 0) {
-         return EmptyTuple.get();
-       }
- 
-       VTuple tuple = new VTuple(schema.size());
- 
 +      try {
 +
 +        // this loop will continue until one tuple is build or EOS (end of 
stream).
 +        do {
 +
 +          ByteBuf buf = reader.readLine();
++
++          // if no more line, then return EOT (end of tuple)
 +          if (buf == null) {
 +            return null;
 +          }
 +
-           try {
++          // If there is no required column, we just read each line
++          // and then return an empty tuple without parsing line.
++          if (targets.length == 0) {
++            recordCount++;
++            return EmptyTuple.get();
++          }
 +
++          tuple = new VTuple(schema.size());
++
++          try {
 +            deserializer.deserialize(buf, tuple);
 +            // if a line is read normaly, it exists this loop.
 +            break;
 +
 +          } catch (TextLineParsingError tae) {
 +
 +            errorNum++;
 +
 +            // suppress too many log prints, which probably cause performance 
degradation
 +            if (errorNum < errorPrintOutMaxNum) {
 +              LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae);
 +            }
 +
 +            // Only when the maximum error torrence limit is set (i.e., 
errorTorrenceMaxNum >= 0),
 +            // it checks if the number of parsing error exceeds the max limit.
 +            // Otherwise, it will ignore all parsing errors.
 +            if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) {
 +              throw tae;
 +            }
 +            continue;
 +          }
 +
 +        } while (reader.isReadable()); // continue until EOS
 +
 +        // recordCount means the number of actual read records. We increment 
the count here.
 +        recordCount++;
 +
 +        return tuple;
 +
 +      } catch (Throwable t) {
 +        LOG.error(t);
 +        throw new IOException(t);
 +      }
 +    }
 +
 +    @Override
 +    public void reset() throws IOException {
 +      init();
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +      try {
 +        if (deserializer != null) {
 +          deserializer.release();
 +        }
 +
 +        if (tableStats != null && reader != null) {
 +          tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed 
Bytes. (decompressed bytes + overhead)
 +          tableStats.setNumRows(recordCount);
 +        }
 +        if (LOG.isDebugEnabled()) {
 +          LOG.debug("DelimitedTextFileScanner processed record:" + 
recordCount);
 +        }
 +      } finally {
 +        IOUtils.cleanup(LOG, reader);
 +        reader = null;
 +      }
 +    }
 +
 +    @Override
 +    public boolean isProjectable() {
 +      return true;
 +    }
 +
 +    @Override
 +    public boolean isSelectable() {
 +      return false;
 +    }
 +
 +    @Override
 +    public void setSearchCondition(Object expr) {
 +    }
 +
 +    @Override
 +    public boolean isSplittable() {
 +      return splittable;
 +    }
 +
 +    @Override
 +    public TableStats getInputStats() {
 +      if (tableStats != null && reader != null) {
 +        tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed 
Bytes. (decompressed bytes + overhead)
 +        tableStats.setNumRows(recordCount);
 +        tableStats.setNumBytes(fragment.getLength());
 +      }
 +      return tableStats;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 0000000,0000000..8749925
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@@ -1,0 -1,0 +1,163 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage;
++
++import com.google.common.base.Preconditions;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.tajo.catalog.CatalogUtil;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.proto.CatalogProtos;
++import org.apache.tajo.common.TajoDataTypes.Type;
++import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.datum.Datum;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.storage.fragment.FileFragment;
++import org.apache.tajo.util.FileUtil;
++import org.junit.Test;
++
++import java.io.File;
++import java.io.IOException;
++import java.net.URL;
++
++import static org.junit.Assert.*;
++
++public class TestDelimitedTextFile {
++
++  private static Schema schema = new Schema();
++
++  private static Tuple baseTuple = new VTuple(10);
++
++  static {
++    schema.addColumn("col1", Type.BOOLEAN);
++    schema.addColumn("col2", Type.CHAR, 7);
++    schema.addColumn("col3", Type.INT2);
++    schema.addColumn("col4", Type.INT4);
++    schema.addColumn("col5", Type.INT8);
++    schema.addColumn("col6", Type.FLOAT4);
++    schema.addColumn("col7", Type.FLOAT8);
++    schema.addColumn("col8", Type.TEXT);
++    schema.addColumn("col9", Type.BLOB);
++    schema.addColumn("col10", Type.INET4);
++
++    baseTuple.put(new Datum[] {
++        DatumFactory.createBool(true),                // 0
++        DatumFactory.createChar("hyunsik"),           // 1
++        DatumFactory.createInt2((short) 17),          // 2
++        DatumFactory.createInt4(59),                  // 3
++        DatumFactory.createInt8(23l),                 // 4
++        DatumFactory.createFloat4(77.9f),             // 5
++        DatumFactory.createFloat8(271.9d),            // 6
++        DatumFactory.createText("hyunsik"),           // 7
++        DatumFactory.createBlob("hyunsik".getBytes()),// 8
++        DatumFactory.createInet4("192.168.0.1"),      // 9
++    });
++  }
++
++  public static Path getResourcePath(String path, String suffix) {
++    URL resultBaseURL = ClassLoader.getSystemResource(path);
++    return new Path(resultBaseURL.toString(), suffix);
++  }
++
++  public static Path getResultPath(Class clazz, String fileName) {
++    return new Path (getResourcePath("results", clazz.getSimpleName()), 
fileName);
++  }
++
++  public static String getResultText(Class clazz, String fileName) throws 
IOException {
++    FileSystem localFS = FileSystem.getLocal(new Configuration());
++    Path path = getResultPath(clazz, fileName);
++    Preconditions.checkState(localFS.exists(path) && localFS.isFile(path));
++    return FileUtil.readTextFile(new File(path.toUri()));
++  }
++
++  private static final FileFragment getFileFragment(String fileName) throws 
IOException {
++    TajoConf conf = new TajoConf();
++    Path tablePath = new Path(getResourcePath("dataset", 
"TestDelimitedTextFile"), fileName);
++    FileSystem fs = FileSystem.getLocal(conf);
++    FileStatus status = fs.getFileStatus(tablePath);
++    return new FileFragment("table", tablePath, 0, status.getLen());
++  }
++
++  @Test
++  public void testIgnoreAllErrors() throws IOException {
++    TajoConf conf = new TajoConf();
++
++    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
++    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
++    FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
++    Scanner scanner =  
StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
++    scanner.init();
++
++    Tuple tuple;
++    int i = 0;
++    while ((tuple = scanner.next()) != null) {
++      assertEquals(baseTuple, tuple);
++      i++;
++    }
++    assertEquals(3, i);
++    scanner.close();
++  }
++
++  @Test
++  public void testIgnoreOneErrorTolerance() throws IOException {
++
++
++    TajoConf conf = new TajoConf();
++
++    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
++    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
++    FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
++    Scanner scanner =  
StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
++    scanner.init();
++
++    assertNotNull(scanner.next());
++    assertNotNull(scanner.next());
++    try {
++      scanner.next();
++    } catch (IOException ioe) {
++      System.out.println(ioe);
++      return;
++    } finally {
++      scanner.close();
++    }
++    fail();
++  }
++
++  @Test
++  public void testNoErrorTolerance() throws IOException {
++    TajoConf conf = new TajoConf();
++    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
++    meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
++    FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
++    Scanner scanner =  
StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
++    scanner.init();
++
++    try {
++      scanner.next();
++    } catch (IOException ioe) {
++      return;
++    } finally {
++      scanner.close();
++    }
++    fail();
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 0000000,0000000..d8e359f
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@@ -1,0 -1,0 +1,193 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.IOUtils;
++import org.apache.hadoop.io.compress.DeflateCodec;
++import org.apache.tajo.catalog.CatalogUtil;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
++import org.apache.tajo.common.TajoDataTypes.Type;
++import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.datum.NullDatum;
++import org.apache.tajo.storage.fragment.FileFragment;
++import org.apache.tajo.storage.text.ByteBufLineReader;
++import org.apache.tajo.storage.text.DelimitedLineReader;
++import org.apache.tajo.storage.text.DelimitedTextFile;
++import org.apache.tajo.util.CommonTestingUtil;
++import org.apache.tajo.util.FileUtil;
++import org.junit.Test;
++
++import java.io.File;
++import java.io.FileInputStream;
++import java.io.IOException;
++import java.util.concurrent.atomic.AtomicInteger;
++
++import static org.junit.Assert.*;
++
++public class TestLineReader {
++      private static String TEST_PATH = "target/test-data/TestLineReader";
++
++  @Test
++  public void testByteBufLineReader() throws IOException {
++    TajoConf conf = new TajoConf();
++    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
++    FileSystem fs = testDir.getFileSystem(conf);
++
++    Schema schema = new Schema();
++    schema.addColumn("id", Type.INT4);
++    schema.addColumn("age", Type.INT8);
++    schema.addColumn("comment", Type.TEXT);
++    schema.addColumn("comment2", Type.TEXT);
++
++    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
++    Path tablePath = new Path(testDir, "line.data");
++    FileAppender appender = (FileAppender) 
StorageManager.getFileStorageManager(conf).getAppender(
++        null, null, meta, schema, tablePath);
++    appender.enableStats();
++    appender.init();
++    int tupleNum = 10000;
++    VTuple vTuple;
++
++    for (int i = 0; i < tupleNum; i++) {
++      vTuple = new VTuple(4);
++      vTuple.put(0, DatumFactory.createInt4(i + 1));
++      vTuple.put(1, DatumFactory.createInt8(25l));
++      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
++      vTuple.put(3, NullDatum.get());
++      appender.addTuple(vTuple);
++    }
++    appender.close();
++
++    FileStatus status = fs.getFileStatus(tablePath);
++
++    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
++    assertEquals(status.getLen(), channel.available());
++    ByteBufLineReader reader = new ByteBufLineReader(channel);
++    assertEquals(status.getLen(), reader.available());
++
++    long totalRead = 0;
++    int i = 0;
++    AtomicInteger bytes = new AtomicInteger();
++    for(;;){
++      ByteBuf buf = reader.readLineBuf(bytes);
++      if(buf == null) break;
++
++      totalRead += bytes.get();
++      i++;
++    }
++    IOUtils.cleanup(null, reader, channel, fs);
++    assertEquals(tupleNum, i);
++    assertEquals(status.getLen(), totalRead);
++    assertEquals(status.getLen(), reader.readBytes());
++  }
++
++  @Test
++  public void testLineDelimitedReader() throws IOException {
++    TajoConf conf = new TajoConf();
++    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
++    FileSystem fs = testDir.getFileSystem(conf);
++
++    Schema schema = new Schema();
++    schema.addColumn("id", Type.INT4);
++    schema.addColumn("age", Type.INT8);
++    schema.addColumn("comment", Type.TEXT);
++    schema.addColumn("comment2", Type.TEXT);
++
++    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
++    meta.putOption("compression.codec", 
DeflateCodec.class.getCanonicalName());
++
++    Path tablePath = new Path(testDir, "line1." + 
DeflateCodec.class.getSimpleName());
++    FileAppender appender = (FileAppender) 
StorageManager.getFileStorageManager(conf).getAppender(
++        null, null, meta, schema, tablePath);
++    appender.enableStats();
++    appender.init();
++    int tupleNum = 10000;
++    VTuple vTuple;
++
++    long splitOffset = 0;
++    for (int i = 0; i < tupleNum; i++) {
++      vTuple = new VTuple(4);
++      vTuple.put(0, DatumFactory.createInt4(i + 1));
++      vTuple.put(1, DatumFactory.createInt8(25l));
++      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
++      vTuple.put(3, NullDatum.get());
++      appender.addTuple(vTuple);
++
++      if(i == (tupleNum / 2)){
++        splitOffset = appender.getOffset();
++      }
++    }
++    String extension = ((DelimitedTextFile.DelimitedTextFileAppender) 
appender).getExtension();
++    appender.close();
++
++    tablePath = tablePath.suffix(extension);
++    FileFragment fragment = new FileFragment("table", tablePath, 0, 
splitOffset);
++    DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // 
if file is compressed, will read to EOF
++    assertTrue(reader.isCompressed());
++    assertFalse(reader.isReadable());
++    reader.init();
++    assertTrue(reader.isReadable());
++
++
++    int i = 0;
++    while(reader.isReadable()){
++      ByteBuf buf = reader.readLine();
++      if(buf == null) break;
++      i++;
++    }
++
++    IOUtils.cleanup(null, reader, fs);
++    assertEquals(tupleNum, i);
++
++  }
++
++  @Test
++  public void testByteBufLineReaderWithoutTerminating() throws IOException {
++    String path = 
FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
++    File file = new File(path);
++    String data = FileUtil.readTextFile(file);
++
++    ByteBufInputChannel channel = new ByteBufInputChannel(new 
FileInputStream(file));
++
++    assertEquals(file.length(), channel.available());
++    ByteBufLineReader reader = new ByteBufLineReader(channel);
++    assertEquals(file.length(), reader.available());
++
++    long totalRead = 0;
++    int i = 0;
++    AtomicInteger bytes = new AtomicInteger();
++    for(;;){
++      ByteBuf buf = reader.readLineBuf(bytes);
++      if(buf == null) break;
++      totalRead += bytes.get();
++      i++;
++    }
++    IOUtils.cleanup(null, reader);
++    assertEquals(file.length(), totalRead);
++    assertEquals(file.length(), reader.readBytes());
++    assertEquals(data.split("\n").length, i);
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
index 0000000,0000000..12ea551
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
@@@ -1,0 -1,0 +1,72 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage;
++
++import io.netty.buffer.ByteBuf;
++import io.netty.buffer.Unpooled;
++import io.netty.util.CharsetUtil;
++import org.apache.tajo.storage.text.FieldSplitProcessor;
++import org.apache.tajo.storage.text.LineSplitProcessor;
++import org.junit.Test;
++
++import java.io.IOException;
++
++import static io.netty.util.ReferenceCountUtil.releaseLater;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++public class TestSplitProcessor {
++
++  @Test
++  public void testFieldSplitProcessor() throws IOException {
++    String data = "abc||de";
++    final ByteBuf buf = releaseLater(
++        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
++
++    final int len = buf.readableBytes();
++    FieldSplitProcessor processor = new FieldSplitProcessor('|');
++
++    assertEquals(3, buf.forEachByte(0, len, processor));
++    assertEquals(4, buf.forEachByte(4, len - 4, processor));
++    assertEquals(-1, buf.forEachByte(5, len - 5, processor));
++
++  }
++
++  @Test
++  public void testLineSplitProcessor() throws IOException {
++    String data = "abc\r\n\n";
++    final ByteBuf buf = releaseLater(
++        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
++
++    final int len = buf.readableBytes();
++    LineSplitProcessor processor = new LineSplitProcessor();
++
++    //find CR
++    assertEquals(3, buf.forEachByte(0, len, processor));
++
++    // find CRLF
++    assertEquals(4, buf.forEachByte(4, len - 4, processor));
++    assertEquals(buf.getByte(4), '\n');
++    // need to skip LF
++    assertTrue(processor.isPrevCharCR());
++
++    // find LF
++    assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is 
zero
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
index 0000000,0000000..70282d9
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
@@@ -1,0 -1,0 +1,101 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.json;
++
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.tajo.catalog.CatalogUtil;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.proto.CatalogProtos;
++import org.apache.tajo.common.TajoDataTypes;
++import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.datum.Datum;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.datum.NullDatum;
++import org.apache.tajo.storage.Scanner;
++import org.apache.tajo.storage.StorageManager;
++import org.apache.tajo.storage.Tuple;
++import org.apache.tajo.storage.VTuple;
++import org.apache.tajo.storage.fragment.FileFragment;
++import org.junit.Test;
++
++import java.io.IOException;
++import java.net.URL;
++
++import static org.junit.Assert.*;
++
++public class TestJsonSerDe {
++  private static Schema schema = new Schema();
++
++  static {
++    schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
++    schema.addColumn("col2", TajoDataTypes.Type.CHAR, 7);
++    schema.addColumn("col3", TajoDataTypes.Type.INT2);
++    schema.addColumn("col4", TajoDataTypes.Type.INT4);
++    schema.addColumn("col5", TajoDataTypes.Type.INT8);
++    schema.addColumn("col6", TajoDataTypes.Type.FLOAT4);
++    schema.addColumn("col7", TajoDataTypes.Type.FLOAT8);
++    schema.addColumn("col8", TajoDataTypes.Type.TEXT);
++    schema.addColumn("col9", TajoDataTypes.Type.BLOB);
++    schema.addColumn("col10", TajoDataTypes.Type.INET4);
++    schema.addColumn("col11", TajoDataTypes.Type.NULL_TYPE);
++  }
++
++  public static Path getResourcePath(String path, String suffix) {
++    URL resultBaseURL = ClassLoader.getSystemResource(path);
++    return new Path(resultBaseURL.toString(), suffix);
++  }
++
++  @Test
++  public void testVarioutType() throws IOException {
++    TajoConf conf = new TajoConf();
++
++    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
++    Path tablePath = new Path(getResourcePath("dataset", "TestJsonSerDe"), 
"testVariousType.json");
++    FileSystem fs = FileSystem.getLocal(conf);
++    FileStatus status = fs.getFileStatus(tablePath);
++    FileFragment fragment = new FileFragment("table", tablePath, 0, 
status.getLen());
++    Scanner scanner =  
StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
++    scanner.init();
++
++    Tuple tuple = scanner.next();
++    assertNotNull(tuple);
++    assertNull(scanner.next());
++    scanner.close();
++
++    Tuple baseTuple = new VTuple(11);
++    baseTuple.put(new Datum[] {
++        DatumFactory.createBool(true),                  // 0
++        DatumFactory.createChar("hyunsik"),             // 1
++        DatumFactory.createInt2((short) 17),            // 2
++        DatumFactory.createInt4(59),                    // 3
++        DatumFactory.createInt8(23l),                   // 4
++        DatumFactory.createFloat4(77.9f),               // 5
++        DatumFactory.createFloat8(271.9d),              // 6
++        DatumFactory.createText("hyunsik"),             // 7
++        DatumFactory.createBlob("hyunsik".getBytes()),  // 8
++        DatumFactory.createInet4("192.168.0.1"),        // 9
++        NullDatum.get(),                                // 10
++    });
++
++    assertEquals(baseTuple, tuple);
++  }
++}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
index 0000000,0000000..739dfe7
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance1.json
@@@ -1,0 -1,0 +1,6 @@@
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0073ef23/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
----------------------------------------------------------------------
diff --cc 
tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
index 0000000,0000000..8256b72
new file mode 100644
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testErrorTolerance2.json
@@@ -1,0 -1,0 +1,4 @@@
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, 
"col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": 
"192.168.0.1"}

Reply via email to