Repository: tajo
Updated Branches:
  refs/heads/master 5a72e2f62 -> 3e305b15f


http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
new file mode 100644
index 0000000..eb1929e
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.ByteBufInputChannel;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DelimitedLineReader implements Closeable {
+  private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
+  private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
+
+  private FileSystem fs;
+  private FSDataInputStream fis;
+  private InputStream is; //decompressd stream
+  private CompressionCodecFactory factory;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
+
+  private long startOffset, end, pos;
+  private boolean eof = true;
+  private ByteBufLineReader lineReader;
+  private AtomicInteger tempReadBytes = new AtomicInteger();
+  private FileFragment fragment;
+  private Configuration conf;
+
+  public DelimitedLineReader(Configuration conf, final FileFragment fragment) 
throws IOException {
+    this.fragment = fragment;
+    this.conf = conf;
+    this.factory = new CompressionCodecFactory(conf);
+    this.codec = factory.getCodec(fragment.getPath());
+    if (this.codec instanceof SplittableCompressionCodec) {
+      throw new NotImplementedException(); // bzip2 does not support 
multi-thread model
+    }
+  }
+
+  public void init() throws IOException {
+    if (fs == null) {
+      fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath());
+    }
+    if (fis == null) fis = fs.open(fragment.getPath());
+    pos = startOffset = fragment.getStartKey();
+    end = startOffset + fragment.getEndKey();
+
+    if (codec != null) {
+      decompressor = CodecPool.getDecompressor(codec);
+      is = new DataInputStream(codec.createInputStream(fis, decompressor));
+      ByteBufInputChannel channel = new ByteBufInputChannel(is);
+      lineReader = new ByteBufLineReader(channel, 
BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
+    } else {
+      fis.seek(startOffset);
+      is = fis;
+
+      ByteBufInputChannel channel = new ByteBufInputChannel(is);
+      lineReader = new ByteBufLineReader(channel,
+          BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
+    }
+    eof = false;
+  }
+
+  public long getCompressedPosition() throws IOException {
+    long retVal;
+    if (isCompressed()) {
+      retVal = fis.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
+
+  public long getUnCompressedPosition() throws IOException {
+    return pos;
+  }
+
+  public long getReadBytes() {
+    return pos - startOffset;
+  }
+
+  public boolean isReadable() {
+    return !eof;
+  }
+
+  public ByteBuf readLine() throws IOException {
+    if (eof) {
+      return null;
+    }
+
+    ByteBuf buf = lineReader.readLineBuf(tempReadBytes);
+    if (buf == null) {
+      eof = true;
+    } else {
+      pos += tempReadBytes.get();
+    }
+
+    if (!isCompressed() && getCompressedPosition() > end) {
+      eof = true;
+    }
+    return buf;
+  }
+
+  public boolean isCompressed() {
+    return codec != null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      IOUtils.cleanup(LOG, lineReader, is, fis);
+      fs = null;
+      is = null;
+      fis = null;
+      lineReader = null;
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
new file mode 100644
index 0000000..dbf8435
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class DelimitedTextFile {
+
+  public static final byte LF = '\n';
+  public static int EOF = -1;
+
+  private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
+
+  public static class DelimitedTextFileAppender extends FileAppender {
+    private final TableMeta meta;
+    private final Schema schema;
+    private final int columnNum;
+    private final FileSystem fs;
+    private FSDataOutputStream fos;
+    private DataOutputStream outputStream;
+    private CompressionOutputStream deflateFilter;
+    private char delimiter;
+    private TableStatistics stats = null;
+    private Compressor compressor;
+    private CompressionCodecFactory codecFactory;
+    private CompressionCodec codec;
+    private Path compressedPath;
+    private byte[] nullChars;
+    private int BUFFER_SIZE = 128 * 1024;
+    private int bufferedBytes = 0;
+    private long pos = 0;
+
+    private NonSyncByteArrayOutputStream os;
+    private FieldSerializerDeserializer serde;
+
+    public DelimitedTextFileAppender(Configuration conf, final Schema schema, 
final TableMeta meta, final Path path)
+        throws IOException {
+      super(conf, schema, meta, path);
+      this.fs = path.getFileSystem(conf);
+      this.meta = meta;
+      this.schema = schema;
+      this.delimiter = 
StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER,
+          StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+      this.columnNum = schema.size();
+
+      String nullCharacters = 
StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL,
+          NullDatum.DEFAULT_TEXT));
+      if (StringUtils.isEmpty(nullCharacters)) {
+        nullChars = NullDatum.get().asTextBytes();
+      } else {
+        nullChars = nullCharacters.getBytes();
+      }
+    }
+
+    @Override
+    public void init() throws IOException {
+      if (!fs.exists(path.getParent())) {
+        throw new FileNotFoundException(path.toString());
+      }
+
+      if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+        String codecName = 
this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+        codecFactory = new CompressionCodecFactory(conf);
+        codec = codecFactory.getCodecByClassName(codecName);
+        compressor = CodecPool.getCompressor(codec);
+        if (compressor != null) compressor.reset();  //builtin gzip is null
+
+        String extension = codec.getDefaultExtension();
+        compressedPath = path.suffix(extension);
+
+        if (fs.exists(compressedPath)) {
+          throw new AlreadyExistsStorageException(compressedPath);
+        }
+
+        fos = fs.create(compressedPath);
+        deflateFilter = codec.createOutputStream(fos, compressor);
+        outputStream = new DataOutputStream(deflateFilter);
+
+      } else {
+        if (fs.exists(path)) {
+          throw new AlreadyExistsStorageException(path);
+        }
+        fos = fs.create(path);
+        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
+      }
+
+      if (enabledStats) {
+        this.stats = new TableStatistics(this.schema);
+      }
+
+      try {
+        // we need to discuss the De/Serializer interface. so custom serde is 
to disable
+        String serdeClass = 
this.meta.getOption(StorageConstants.TEXTFILE_SERDE,
+            TextFieldSerializerDeserializer.class.getName());
+        serde = (TextFieldSerializerDeserializer) 
ReflectionUtils.newInstance(Class.forName(serdeClass), conf);
+      } catch (Throwable e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e);
+      }
+
+      if (os == null) {
+        os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+      }
+
+      os.reset();
+      pos = fos.getPos();
+      bufferedBytes = 0;
+      super.init();
+    }
+
+
+    @Override
+    public void addTuple(Tuple tuple) throws IOException {
+      Datum datum;
+      int rowBytes = 0;
+
+      for (int i = 0; i < columnNum; i++) {
+        datum = tuple.get(i);
+        rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, 
nullChars);
+
+        if (columnNum - 1 > i) {
+          os.write((byte) delimiter);
+          rowBytes += 1;
+        }
+      }
+      os.write(LF);
+      rowBytes += 1;
+
+      pos += rowBytes;
+      bufferedBytes += rowBytes;
+      if (bufferedBytes > BUFFER_SIZE) {
+        flushBuffer();
+      }
+      // Statistical section
+      if (enabledStats) {
+        stats.incrementRow();
+      }
+    }
+
+    private void flushBuffer() throws IOException {
+      if (os.getLength() > 0) {
+        os.writeTo(outputStream);
+        os.reset();
+        bufferedBytes = 0;
+      }
+    }
+
+    @Override
+    public long getOffset() throws IOException {
+      return pos;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      flushBuffer();
+      outputStream.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+
+      try {
+        if(outputStream != null){
+          flush();
+        }
+
+        // Statistical section
+        if (enabledStats) {
+          stats.setNumBytes(getOffset());
+        }
+
+        if (deflateFilter != null) {
+          deflateFilter.finish();
+          deflateFilter.resetState();
+          deflateFilter = null;
+        }
+
+        os.close();
+      } finally {
+        IOUtils.cleanup(LOG, fos);
+        if (compressor != null) {
+          CodecPool.returnCompressor(compressor);
+          compressor = null;
+        }
+      }
+    }
+
+    @Override
+    public TableStats getStats() {
+      if (enabledStats) {
+        return stats.getTableStat();
+      } else {
+        return null;
+      }
+    }
+
+    public boolean isCompress() {
+      return compressor != null;
+    }
+
+    public String getExtension() {
+      return codec != null ? codec.getDefaultExtension() : "";
+    }
+  }
+
+  public static class DelimitedTextFileScanner extends FileScanner {
+
+    private boolean splittable = false;
+    private final long startOffset;
+    private final long endOffset;
+
+    private int recordCount = 0;
+    private int[] targetColumnIndexes;
+
+    private ByteBuf nullChars;
+    private FieldSerializerDeserializer serde;
+    private DelimitedLineReader reader;
+    private FieldSplitProcessor processor;
+
+    public DelimitedTextFileScanner(Configuration conf, final Schema schema, 
final TableMeta meta,
+                                    final FileFragment fragment)
+        throws IOException {
+      super(conf, schema, meta, fragment);
+      reader = new DelimitedLineReader(conf, fragment);
+      if (!reader.isCompressed()) {
+        splittable = true;
+      }
+
+      startOffset = fragment.getStartKey();
+      endOffset = startOffset + fragment.getEndKey();
+
+      //Delimiter
+      String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+      this.processor = new 
FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0));
+    }
+
+    @Override
+    public void init() throws IOException {
+      if (nullChars != null) {
+        nullChars.release();
+      }
+
+      String nullCharacters = 
StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
+          NullDatum.DEFAULT_TEXT));
+      byte[] bytes;
+      if (StringUtils.isEmpty(nullCharacters)) {
+        bytes = NullDatum.get().asTextBytes();
+      } else {
+        bytes = nullCharacters.getBytes();
+      }
+
+      nullChars = BufferPool.directBuffer(bytes.length, bytes.length);
+      nullChars.writeBytes(bytes);
+
+      if (reader != null) {
+        reader.close();
+      }
+      reader = new DelimitedLineReader(conf, fragment);
+      reader.init();
+      recordCount = 0;
+
+      if (targets == null) {
+        targets = schema.toArray();
+      }
+
+      targetColumnIndexes = new int[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        targetColumnIndexes[i] = 
schema.getColumnId(targets[i].getQualifiedName());
+      }
+
+      try {
+        // we need to discuss the De/Serializer interface. so custom serde is 
to disable
+        String serdeClass = 
this.meta.getOption(StorageConstants.TEXTFILE_SERDE,
+            TextFieldSerializerDeserializer.class.getName());
+        serde = (TextFieldSerializerDeserializer) 
ReflectionUtils.newInstance(Class.forName(serdeClass), conf);
+      } catch (Throwable e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e);
+      }
+
+      super.init();
+      Arrays.sort(targetColumnIndexes);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," 
+ startOffset + "," + endOffset);
+      }
+
+      if (startOffset > 0) {
+        reader.readLine();  // skip first line;
+      }
+    }
+
+    public ByteBuf readLine() throws IOException {
+      ByteBuf buf = reader.readLine();
+      if (buf == null) {
+        return null;
+      } else {
+        recordCount++;
+      }
+
+      return buf;
+    }
+
+    @Override
+    public float getProgress() {
+      try {
+        if (!reader.isReadable()) {
+          return 1.0f;
+        }
+        long filePos = reader.getCompressedPosition();
+        if (startOffset == filePos) {
+          return 0.0f;
+        } else {
+          long readBytes = filePos - startOffset;
+          long remainingBytes = Math.max(endOffset - filePos, 0);
+          return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + 
remainingBytes));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return 0.0f;
+      }
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      try {
+        if (!reader.isReadable()) return null;
+
+        ByteBuf buf = readLine();
+        if (buf == null) return null;
+
+        if (targets.length == 0) {
+          return EmptyTuple.get();
+        }
+
+        VTuple tuple = new VTuple(schema.size());
+        fillTuple(schema, tuple, buf, targetColumnIndexes);
+        return tuple;
+      } catch (Throwable t) {
+        LOG.error("Tuple list current index: " + recordCount + " file offset:" 
+ reader.getCompressedPosition(), t);
+        throw new IOException(t);
+      }
+    }
+
+    private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] 
target) throws IOException {
+      int[] projection = target;
+      if (lineBuf == null || target == null || target.length == 0) {
+        return;
+      }
+
+      final int rowLength = lineBuf.readableBytes();
+      int start = 0, fieldLength = 0, end = 0;
+
+      //Projection
+      int currentTarget = 0;
+      int currentIndex = 0;
+
+      while (end != -1) {
+        end = lineBuf.forEachByte(start, rowLength - start, processor);
+
+        if (end < 0) {
+          fieldLength = rowLength - start;
+        } else {
+          fieldLength = end - start;
+        }
+
+        if (projection.length > currentTarget && currentIndex == 
projection[currentTarget]) {
+          Datum datum = serde.deserialize(lineBuf.slice(start, fieldLength),
+              schema.getColumn(currentIndex), currentIndex, nullChars);
+          dst.put(currentIndex, datum);
+          currentTarget++;
+        }
+
+        if (projection.length == currentTarget) {
+          break;
+        }
+
+        start = end + 1;
+        currentIndex++;
+      }
+    }
+
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        if (nullChars != null) {
+          nullChars.release();
+          nullChars = null;
+        }
+
+        if (tableStats != null && reader != null) {
+          tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed 
Bytes. (decompressed bytes + overhead)
+          tableStats.setNumRows(recordCount);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DelimitedTextFileScanner processed record:" + 
recordCount);
+        }
+      } finally {
+        IOUtils.cleanup(LOG, reader);
+        reader = null;
+      }
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return true;
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public void setSearchCondition(Object expr) {
+    }
+
+    @Override
+    public boolean isSplittable() {
+      return splittable;
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      if (tableStats != null && reader != null) {
+        tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed 
Bytes. (decompressed bytes + overhead)
+        tableStats.setNumRows(recordCount);
+        tableStats.setNumBytes(fragment.getEndKey());
+      }
+      return tableStats;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
new file mode 100644
index 0000000..a5ac142
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class FieldSplitProcessor implements ByteBufProcessor {
+  private char delimiter; //the ascii separate character
+
+  public FieldSplitProcessor(char recordDelimiterByte) {
+    this.delimiter = recordDelimiterByte;
+  }
+
+  @Override
+  public boolean process(byte value) throws Exception {
+    return delimiter != value;
+  }
+
+  public char getDelimiter() {
+    return delimiter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
new file mode 100644
index 0000000..a130527
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import io.netty.buffer.ByteBufProcessor;
+
+public class LineSplitProcessor implements ByteBufProcessor {
+  public static final byte CR = '\r';
+  public static final byte LF = '\n';
+  private boolean prevCharCR = false; //true of prev char was CR
+
+  @Override
+  public boolean process(byte value) throws Exception {
+    switch (value) {
+      case LF:
+        return false;
+      case CR:
+        prevCharCR = true;
+        return false;
+      default:
+        prevCharCR = false;
+        return true;
+    }
+  }
+
+  public boolean isPrevCharCR() {
+    return prevCharCR;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
new file mode 100644
index 0000000..0057b54
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import com.google.protobuf.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+//Compatibility with Apache Hive
+public class TextFieldSerializerDeserializer implements 
FieldSerializerDeserializer {
+  public static final byte[] trueBytes = "true".getBytes();
+  public static final byte[] falseBytes = "false".getBytes();
+  private ProtobufJsonFormat protobufJsonFormat = 
ProtobufJsonFormat.getInstance();
+
+  private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
+    return !val.isReadable() || nullBytes.equals(val);
+  }
+
+  private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) {
+    return val.readableBytes() > 0 && nullBytes.equals(val);
+  }
+
+  @Override
+  public int serialize(OutputStream out, Datum datum, Column col, int 
columnIndex, byte[] nullChars) throws IOException {
+    byte[] bytes;
+    int length = 0;
+    TajoDataTypes.DataType dataType = col.getDataType();
+
+    if (datum == null || datum instanceof NullDatum) {
+      switch (dataType.getType()) {
+        case CHAR:
+        case TEXT:
+          length = nullChars.length;
+          out.write(nullChars);
+          break;
+        default:
+          break;
+      }
+      return length;
+    }
+
+    switch (dataType.getType()) {
+      case BOOLEAN:
+        out.write(datum.asBool() ? trueBytes : falseBytes);
+        length = trueBytes.length;
+        break;
+      case CHAR:
+        byte[] pad = new byte[dataType.getLength() - datum.size()];
+        bytes = datum.asTextBytes();
+        out.write(bytes);
+        out.write(pad);
+        length = bytes.length + pad.length;
+        break;
+      case TEXT:
+      case BIT:
+      case INT2:
+      case INT4:
+      case INT8:
+      case FLOAT4:
+      case FLOAT8:
+      case INET4:
+      case DATE:
+      case INTERVAL:
+        bytes = datum.asTextBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case TIME:
+        bytes = ((TimeDatum) datum).asChars(TajoConf.getCurrentTimeZone(), 
true).getBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case TIMESTAMP:
+        bytes = ((TimestampDatum) 
datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case INET6:
+      case BLOB:
+        bytes = Base64.encodeBase64(datum.asByteArray(), false);
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case PROTOBUF:
+        ProtobufDatum protobuf = (ProtobufDatum) datum;
+        byte[] protoBytes = 
protobufJsonFormat.printToString(protobuf.get()).getBytes();
+        length = protoBytes.length;
+        out.write(protoBytes, 0, protoBytes.length);
+        break;
+      case NULL_TYPE:
+      default:
+        break;
+    }
+    return length;
+  }
+
+  @Override
+  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf 
nullChars) throws IOException{
+    Datum datum;
+    TajoDataTypes.Type type = col.getDataType().getType();
+    boolean nullField;
+    if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
+      nullField = isNullText(buf, nullChars);
+    } else {
+      nullField = isNull(buf, nullChars);
+    }
+
+    if (nullField) {
+      datum = NullDatum.get();
+    } else {
+      switch (type) {
+        case BOOLEAN:
+          byte bool = buf.readByte();
+          datum = DatumFactory.createBool(bool == 't' || bool == 'T');
+          break;
+        case BIT:
+          datum = 
DatumFactory.createBit(Byte.parseByte(buf.toString(CharsetUtil.UTF_8)));
+          break;
+        case CHAR:
+          datum = 
DatumFactory.createChar(buf.toString(CharsetUtil.UTF_8).trim());
+          break;
+        case INT1:
+        case INT2: {
+          //TODO zero-copy
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, 
0, bytes.length));
+          break;
+        }
+        case INT4: {
+          //TODO zero-copy
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, 
bytes.length));
+          break;
+        }
+        case INT8:
+          //TODO zero-copy
+          datum = DatumFactory.createInt8(buf.toString(CharsetUtil.UTF_8));
+          break;
+        case FLOAT4:
+          //TODO zero-copy
+          datum = DatumFactory.createFloat4(buf.toString(CharsetUtil.UTF_8));
+          break;
+        case FLOAT8: {
+          //TODO zero-copy
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, 
bytes.length));
+          break;
+        }
+        case TEXT: {
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createText(bytes);
+          break;
+        }
+        case DATE:
+          datum = DatumFactory.createDate(buf.toString(CharsetUtil.UTF_8));
+          break;
+        case TIME:
+          datum = DatumFactory.createTime(buf.toString(CharsetUtil.UTF_8));
+          break;
+        case TIMESTAMP:
+          datum = 
DatumFactory.createTimestamp(buf.toString(CharsetUtil.UTF_8));
+          break;
+        case INTERVAL:
+          datum = DatumFactory.createInterval(buf.toString(CharsetUtil.UTF_8));
+          break;
+        case PROTOBUF: {
+          ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(col.getDataType());
+          Message.Builder builder = factory.newBuilder();
+          try {
+            byte[] bytes = new byte[buf.readableBytes()];
+            buf.readBytes(bytes);
+            protobufJsonFormat.merge(bytes, builder);
+            datum = factory.createDatum(builder.build());
+          } catch (IOException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          break;
+        }
+        case INET4:
+          datum = DatumFactory.createInet4(buf.toString(CharsetUtil.UTF_8));
+          break;
+        case BLOB: {
+          byte[] bytes = new byte[buf.readableBytes()];
+          buf.readBytes(bytes);
+          datum = DatumFactory.createBlob(Base64.decodeBase64(bytes));
+          break;
+        }
+        default:
+          datum = NullDatum.get();
+          break;
+      }
+    }
+    return datum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml 
b/tajo-storage/src/main/resources/storage-default.xml
index 4669477..f262585 100644
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -21,11 +21,6 @@
 
 <configuration>
   <property>
-    <name>tajo.storage.manager.v2</name>
-    <value>false</value>
-  </property>
-
-  <property>
     <name>tajo.storage.manager.maxReadBytes</name>
     <value>8388608</value>
     <description></description>
@@ -40,11 +35,15 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+    <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
   </property>
 
   <!--- Fragment Class Configurations -->
   <property>
+    <name>tajo.storage.fragment.textfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.csv.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -79,13 +78,13 @@
 
   <!--- Scanner Handler -->
   <property>
-    <name>tajo.storage.scanner-handler.csv.class</name>
-    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+    <name>tajo.storage.scanner-handler.textfile.class</name>
+    
<value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.csv.class</name>
-    <value>org.apache.tajo.storage.v2.CSVFileScanner</value>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
   </property>
 
   <property>
@@ -94,74 +93,44 @@
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.raw.class</name>
-    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.rcfile.class</name>
-    <value>org.apache.tajo.storage.v2.RCFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.rowfile.class</name>
     <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.rowfile.class</name>
-    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.trevni.class</name>
     <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.trevni.class</name>
-    <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.parquet.class</name>
     <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.parquet.class</name>
-    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.sequencefile.class</name>
     <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.sequencefile.class</name>
-    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.avro.class</name>
     <value>org.apache.tajo.storage.avro.AvroScanner</value>
   </property>
 
+  <!--- Appender Handler -->
   <property>
-    <name>tajo.storage.scanner-handler.v2.avro.class</name>
-    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+    <name>tajo.storage.appender-handler</name>
+    <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
   </property>
 
-  <!--- Appender Handler -->
   <property>
-    <name>tajo.storage.appender-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+    <name>tajo.storage.appender-handler.textfile.class</name>
+    
<value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
   </property>
 
   <property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
 
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 212f374..fd5a63e 100644
--- 
a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ 
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -37,6 +37,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
+import org.apache.tajo.storage.text.DelimitedTextFile;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -70,7 +71,8 @@ public class TestCompressionStorages {
     return Arrays.asList(new Object[][]{
         {StoreType.CSV},
         {StoreType.RCFILE},
-        {StoreType.SEQUENCEFILE}
+        {StoreType.SEQUENCEFILE},
+        {StoreType.TEXTFILE}
     });
   }
 
@@ -102,81 +104,11 @@ public class TestCompressionStorages {
   }
 
   @Test
-  public void testBzip2CodecCompressionData() throws IOException {
-    storageCompressionTest(storeType, BZip2Codec.class);
-  }
-
-  @Test
   public void testLz4CodecCompressionData() throws IOException {
     if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
     storageCompressionTest(storeType, Lz4Codec.class);
   }
 
-  @Test
-  public void testSplitCompressionData() throws IOException {
-    if(StoreType.CSV != storeType) return;
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT8);
-
-    TableMeta meta = CatalogUtil.newTableMeta(storeType);
-    meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
-
-    Path tablePath = new Path(testDir, "SplitCompression");
-    Appender appender = 
StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
-    appender.enableStats();
-    appender.init();
-
-    String extention = "";
-    if (appender instanceof CSVFile.CSVAppender) {
-      extention = ((CSVFile.CSVAppender) appender).getExtension();
-    }
-
-    int tupleNum = 100000;
-    VTuple vTuple;
-
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(2);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createInt8(25l));
-      appender.addTuple(vTuple);
-    }
-    appender.close();
-
-    TableStats stat = appender.getStats();
-    assertEquals(tupleNum, stat.getNumRows().longValue());
-    tablePath = tablePath.suffix(extention);
-
-    FileStatus status = fs.getFileStatus(tablePath);
-    long fileLen = status.getLen();
-    long randomNum = (long) (Math.random() * fileLen) + 1;
-
-    FileFragment[] tablets = new FileFragment[2];
-    tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum);
-    tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, 
(fileLen - randomNum));
-
-    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, 
schema, tablets[0], schema);
-    assertTrue(scanner.isSplittable());
-    scanner.init();
-    int tupleCnt = 0;
-    Tuple tuple;
-    while ((tuple = scanner.next()) != null) {
-      tupleCnt++;
-    }
-    scanner.close();
-
-    scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, 
tablets[1], schema);
-    assertTrue(scanner.isSplittable());
-    scanner.init();
-    while ((tuple = scanner.next()) != null) {
-      tupleCnt++;
-    }
-
-    scanner.close();
-    assertEquals(tupleNum, tupleCnt);
-  }
-
   private void storageCompressionTest(StoreType storeType, Class<? extends 
CompressionCodec> codec) throws IOException {
     Schema schema = new Schema();
     schema.addColumn("id", Type.INT4);
@@ -199,6 +131,8 @@ public class TestCompressionStorages {
     String extension = "";
     if (appender instanceof CSVFile.CSVAppender) {
       extension = ((CSVFile.CSVAppender) appender).getExtension();
+    } else if (appender instanceof 
DelimitedTextFile.DelimitedTextFileAppender) {
+      extension = ((DelimitedTextFile.DelimitedTextFileAppender) 
appender).getExtension();
     }
 
     int tupleNum = 100000;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java 
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
new file mode 100644
index 0000000..ef6efdf
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.text.ByteBufLineReader;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.storage.text.DelimitedLineReader;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+public class TestLineReader {
+       private static String TEST_PATH = "target/test-data/TestLineReader";
+
+  @Test
+  public void testByteBufLineReader() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    FileSystem fs = testDir.getFileSystem(conf);
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("comment", Type.TEXT);
+    schema.addColumn("comment2", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+    Path tablePath = new Path(testDir, "line.data");
+    FileAppender appender = (FileAppender) 
StorageManager.getStorageManager(conf).getAppender(meta, schema,
+        tablePath);
+    appender.enableStats();
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+      vTuple.put(3, NullDatum.get());
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+
+    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
+    assertEquals(status.getLen(), channel.available());
+    ByteBufLineReader reader = new ByteBufLineReader(channel);
+    assertEquals(status.getLen(), reader.available());
+
+    long totalRead = 0;
+    int i = 0;
+    AtomicInteger bytes = new AtomicInteger();
+    for(;;){
+      ByteBuf buf = reader.readLineBuf(bytes);
+      if(buf == null) break;
+
+      totalRead += bytes.get();
+      i++;
+    }
+    IOUtils.cleanup(null, reader, channel, fs);
+    assertEquals(tupleNum, i);
+    assertEquals(status.getLen(), totalRead);
+    assertEquals(status.getLen(), reader.readBytes());
+  }
+
+  @Test
+  public void testLineDelimitedReader() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    FileSystem fs = testDir.getFileSystem(conf);
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("comment", Type.TEXT);
+    schema.addColumn("comment2", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+    meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
+
+    Path tablePath = new Path(testDir, "line1." + 
DeflateCodec.class.getSimpleName());
+    FileAppender appender = (FileAppender) 
StorageManager.getStorageManager(conf).getAppender(meta, schema,
+        tablePath);
+    appender.enableStats();
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    long splitOffset = 0;
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(4);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+      vTuple.put(3, NullDatum.get());
+      appender.addTuple(vTuple);
+
+      if(i == (tupleNum / 2)){
+        splitOffset = appender.getOffset();
+      }
+    }
+    String extension = ((DelimitedTextFile.DelimitedTextFileAppender) 
appender).getExtension();
+    appender.close();
+
+    tablePath = tablePath.suffix(extension);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, 
splitOffset);
+    DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // 
if file is compressed, will read to EOF
+    assertTrue(reader.isCompressed());
+    assertFalse(reader.isReadable());
+    reader.init();
+    assertTrue(reader.isReadable());
+
+
+    int i = 0;
+    while(reader.isReadable()){
+      ByteBuf buf = reader.readLine();
+      if(buf == null) break;
+      i++;
+    }
+
+    IOUtils.cleanup(null, reader, fs);
+    assertEquals(tupleNum, i);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java 
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
new file mode 100644
index 0000000..12ea551
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.CharsetUtil;
+import org.apache.tajo.storage.text.FieldSplitProcessor;
+import org.apache.tajo.storage.text.LineSplitProcessor;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static io.netty.util.ReferenceCountUtil.releaseLater;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSplitProcessor {
+
+  @Test
+  public void testFieldSplitProcessor() throws IOException {
+    String data = "abc||de";
+    final ByteBuf buf = releaseLater(
+        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
+
+    final int len = buf.readableBytes();
+    FieldSplitProcessor processor = new FieldSplitProcessor('|');
+
+    assertEquals(3, buf.forEachByte(0, len, processor));
+    assertEquals(4, buf.forEachByte(4, len - 4, processor));
+    assertEquals(-1, buf.forEachByte(5, len - 5, processor));
+
+  }
+
+  @Test
+  public void testLineSplitProcessor() throws IOException {
+    String data = "abc\r\n\n";
+    final ByteBuf buf = releaseLater(
+        Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
+
+    final int len = buf.readableBytes();
+    LineSplitProcessor processor = new LineSplitProcessor();
+
+    //find CR
+    assertEquals(3, buf.forEachByte(0, len, processor));
+
+    // find CRLF
+    assertEquals(4, buf.forEachByte(4, len - 4, processor));
+    assertEquals(buf.getByte(4), '\n');
+    // need to skip LF
+    assertTrue(processor.isPrevCharCR());
+
+    // find LF
+    assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is 
zero
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java 
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index dca21af..56cef77 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -121,12 +121,14 @@ public class TestStorages {
   @Parameterized.Parameters
   public static Collection<Object[]> generateParameters() {
     return Arrays.asList(new Object[][] {
+        //type, splitable, statsable, seekable
         {StoreType.CSV, true, true, true},
         {StoreType.RAW, false, false, true},
         {StoreType.RCFILE, true, true, false},
         {StoreType.PARQUET, false, false, false},
         {StoreType.SEQUENCEFILE, true, true, false},
         {StoreType.AVRO, false, false, false},
+        {StoreType.TEXTFILE, true, true, false},
     });
   }
 
@@ -381,7 +383,7 @@ public class TestStorages {
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
-    meta.putOption(StorageConstants.CSVFILE_NULL, "\\\\N");
+    meta.putOption(StorageConstants.TEXT_NULL, "\\\\N");
     meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
     meta.putOption(StorageConstants.RCFILE_SERDE, 
TextSerializerDeserializer.class.getName());
     meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml 
b/tajo-storage/src/test/resources/storage-default.xml
index 6bfc902..a81f3d6 100644
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ b/tajo-storage/src/test/resources/storage-default.xml
@@ -28,11 +28,15 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+    <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
   </property>
 
   <!--- Fragment Class Configurations -->
   <property>
+    <name>tajo.storage.fragment.textfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.csv.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -67,6 +71,11 @@
 
   <!--- Scanner Handler -->
   <property>
+    <name>tajo.storage.scanner-handler.textfile.class</name>
+    
<value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.csv.class</name>
     <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
   </property>
@@ -109,7 +118,12 @@
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+    <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.textfile.class</name>
+    
<value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
   </property>
 
   <property>

Reply via email to