Repository: tajo
Updated Branches:
  refs/heads/master c46dc1a64 -> aa8969ac8


TAJO-1273: Merge DirectRawFile to master branch. (jinho)

Closes #661


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/aa8969ac
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/aa8969ac
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/aa8969ac

Branch: refs/heads/master
Commit: aa8969ac810e3341fb87b41e521f196e425ad069
Parents: c46dc1a
Author: Jinho Kim <[email protected]>
Authored: Wed Jul 29 17:40:31 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Wed Jul 29 17:40:31 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/BuiltinStorages.java   |   1 +
 .../apache/tajo/storage/FSDataInputChannel.java |  16 +-
 .../tajo/storage/LocalFileInputChannel.java     |  23 +-
 .../apache/tajo/storage/SeekableChannel.java    |   5 +-
 .../tajo/storage/SeekableInputChannel.java      |  34 ++
 .../apache/tajo/storage/TableStatistics.java    |   4 +
 .../tajo/tuple/offheap/OffHeapRowBlock.java     |  37 ++
 .../apache/tajo/tuple/offheap/UnSafeTuple.java  |   4 +-
 .../storage/rawfile/DirectRawFileScanner.java   | 219 +++++++++
 .../storage/rawfile/DirectRawFileWriter.java    | 214 +++++++++
 .../tajo/storage/text/DelimitedLineReader.java  |   2 +-
 .../tajo/storage/TestByteBufLineReader.java     |   6 +-
 .../tajo/storage/raw/TestDirectRawFile.java     | 480 +++++++++++++++++++
 14 files changed, 1032 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1389b6f..91a8346 100644
--- a/CHANGES
+++ b/CHANGES
@@ -370,6 +370,8 @@ Release 0.11.0 - unreleased
   
   TASKS
 
+    TAJO-1273: Merge DirectRawFile to master branch. (jinho)
+
     TAJO-1628: Add a documentation for join operation. (jihoon)
 
     TAJO-1687: sphinx-mavan-plugin version should be 1.0.3. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java 
b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
index 6f1b7c6..11f0287 100644
--- a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
+++ b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
@@ -22,6 +22,7 @@ public class BuiltinStorages {
   public static final String TEXT = "TEXT";
   public static final String JSON = "JSON";
   public static final String RAW = "RAW";
+  public static final String DRAW = "DRAW";
   public static final String RCFILE = "RCFILE";
   public static final String ROW = "ROW";
   public static final String PARQUET = "PARQUET";

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
index ed84d24..3f638c0 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java
@@ -30,13 +30,14 @@ import java.nio.channels.ReadableByteChannel;
 /**
  * FSDataInputChannel is a NIO channel implementation of direct read ability 
to read from HDFS
  */
-public final class FSDataInputChannel extends InputChannel implements 
SeekableChannel {
+public final class FSDataInputChannel extends SeekableInputChannel {
 
   private ReadableByteChannel channel;
   private FSDataInputStream inputStream;
   private boolean isDirectRead;
+  private long size;
 
-  public FSDataInputChannel(FSDataInputStream inputStream) {
+  public FSDataInputChannel(FSDataInputStream inputStream) throws IOException {
     if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
       this.isDirectRead = true;
     } else {
@@ -44,6 +45,7 @@ public final class FSDataInputChannel extends InputChannel 
implements SeekableCh
       this.channel = Channels.newChannel(inputStream);
     }
     this.inputStream = inputStream;
+    this.size = inputStream.getPos() + inputStream.available();
   }
 
   @Override
@@ -61,6 +63,16 @@ public final class FSDataInputChannel extends InputChannel 
implements SeekableCh
   }
 
   @Override
+  public long position() throws IOException {
+    return inputStream.getPos();
+  }
+
+  @Override
+  public long size() throws IOException {
+    return size;
+  }
+
+  @Override
   protected void implCloseChannel() throws IOException {
     IOUtils.cleanup(null, channel, inputStream);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
index bd7d668..fbc4df0 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LocalFileInputChannel.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
 
 import org.apache.hadoop.io.IOUtils;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -27,11 +28,15 @@ import java.nio.channels.FileChannel;
 /**
  * LocalFileInputChannel is a FileChannel wrapper of seek ability
  */
-public final class LocalFileInputChannel extends InputChannel implements 
SeekableChannel {
+public final class LocalFileInputChannel extends SeekableInputChannel {
+  private FileInputStream fileInputStream;
   private FileChannel channel;
+  private long size;
 
-  public LocalFileInputChannel(FileChannel channel) {
-    this.channel = channel;
+  public LocalFileInputChannel(FileInputStream fileInputStream) throws 
IOException {
+    this.fileInputStream = fileInputStream;
+    this.channel = fileInputStream.getChannel();
+    this.size = channel.size();
   }
 
   @Override
@@ -45,7 +50,17 @@ public final class LocalFileInputChannel extends 
InputChannel implements Seekabl
   }
 
   @Override
+  public long position() throws IOException {
+    return channel.position();
+  }
+
+  @Override
+  public long size() throws IOException {
+    return size;
+  }
+
+  @Override
   protected void implCloseChannel() throws IOException {
-    IOUtils.cleanup(null, channel);
+    IOUtils.cleanup(null, channel, fileInputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
index e788099..61b39f2 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableChannel.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 
 public interface SeekableChannel {
 
-  public abstract void seek(long offset) throws IOException;
+  void seek(long offset) throws IOException;
 
+  long position() throws IOException;
+
+  long size() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableInputChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableInputChannel.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableInputChannel.java
new file mode 100644
index 0000000..bdbc8c0
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableInputChannel.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.nio.ByteBuffer;
+
+public abstract class SeekableInputChannel extends InputChannel implements 
SeekableChannel {
+
+  @Override
+  public long read(ByteBuffer[] dsts, int offset, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long read(ByteBuffer[] dsts) {
+    return read(dsts, 0, dsts.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
index c101b0b..aa33ea3 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -69,6 +69,10 @@ public class TableStatistics {
     numRows++;
   }
 
+  public void incrementRows(long num) {
+    numRows += num;
+  }
+
   public long getNumRows() {
     return this.numRows;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
index 689efb7..90d4791 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.SeekableInputChannel;
 import org.apache.tajo.util.Deallocatable;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.SizeOf;
@@ -131,6 +132,42 @@ public class OffHeapRowBlock extends OffHeapMemory 
implements Deallocatable {
     this.rowNum = rowNum;
   }
 
+
+  public boolean copyFromChannel(SeekableInputChannel channel, TableStats 
stats) throws IOException {
+    if (channel.position() < channel.size()) {
+      clear();
+
+      buffer.clear();
+      channel.read(buffer);
+      memorySize = buffer.position();
+
+      while (position < memorySize) {
+        long recordPtr = address + position;
+
+        if (remain() < SizeOf.SIZE_OF_INT) {
+          channel.seek(channel.position() - remain());
+          memorySize = (int) (memorySize - remain());
+          return true;
+        }
+
+        int recordSize = UNSAFE.getInt(recordPtr);
+
+        if (remain() < recordSize) {
+          channel.seek(channel.position() - remain());
+          memorySize = (int) (memorySize - remain());
+          return true;
+        }
+
+        position += recordSize;
+        rowNum++;
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   public boolean copyFromChannel(FileChannel channel, TableStats stats) throws 
IOException {
     if (channel.position() < channel.size()) {
       clear();

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
index 4ccba7b..fd427ca 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -100,7 +100,7 @@ public abstract class UnSafeTuple implements Tuple {
   }
 
   private int getFieldOffset(int fieldId) {
-    return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + 
(fieldId * SizeOf.SIZE_OF_INT));
+    return UNSAFE.getInt(bb.address() + (long)(relativePos + 
SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)));
   }
 
   public long getFieldAddr(int fieldId) {
@@ -278,7 +278,7 @@ public abstract class UnSafeTuple implements Tuple {
   public Datum getProtobufDatum(int fieldId) {
     byte [] bytes = getBytes(fieldId);
 
-    ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(types[fieldId].getCode());
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId]);
     Message.Builder builder = factory.newBuilder();
     try {
       builder.mergeFrom(bytes);

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
new file mode 100644
index 0000000..8ae9a26
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rawfile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+public class DirectRawFileScanner extends FileScanner implements 
SeekableScanner {
+  private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class);
+
+  private SeekableInputChannel channel;
+  private TajoDataTypes.DataType[] columnTypes;
+
+  private boolean eof = false;
+  private long fileSize;
+  private long recordCount;
+
+  private ZeroCopyTuple unSafeTuple = new ZeroCopyTuple();
+  private OffHeapRowBlock tupleBuffer;
+  private OffHeapRowBlockReader reader;
+
+  public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta 
meta, FileFragment fragment) throws IOException {
+    super(conf, schema, meta, fragment);
+  }
+
+  public void init() throws IOException {
+    initChannel();
+
+    columnTypes = new TajoDataTypes.DataType[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      columnTypes[i] = schema.getColumn(i).getDataType();
+    }
+
+    tupleBuffer = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    reader = new OffHeapRowBlockReader(tupleBuffer);
+
+    fetchNeeded = !next(tupleBuffer);
+
+    super.init();
+  }
+
+  private void initChannel() throws IOException {
+    FileSystem fs = FileScanner.getFileSystem((TajoConf) conf, 
fragment.getPath());
+
+    if (fs instanceof LocalFileSystem) {
+      File file;
+      try {
+        if (fragment.getPath().toUri().getScheme() != null) {
+          file = new File(fragment.getPath().toUri());
+        } else {
+          file = new File(fragment.getPath().toString());
+        }
+      } catch (IllegalArgumentException iae) {
+        throw new IOException(iae);
+      }
+
+      channel = new LocalFileInputChannel(new FileInputStream(file));
+      channel.seek(fragment.getStartKey());
+      fileSize = channel.size();
+    } else {
+      channel = new FSDataInputChannel(fs.open(fragment.getPath()));
+      channel.seek(fragment.getStartKey());
+      fileSize = channel.size();
+    }
+
+    if (tableStats != null) {
+      tableStats.setNumBytes(fileSize);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RawFileScanner open:" + fragment.getPath() + ", offset :" +
+          fragment.getStartKey() + ", file size :" + fileSize);
+    }
+  }
+
+  @Override
+  public long getNextOffset() throws IOException {
+    return channel.position() - reader.remainForRead();
+  }
+
+  @Override
+  public void seek(long offset) throws IOException {
+    channel.seek(offset);
+    fetchNeeded = true;
+  }
+
+  public boolean next(OffHeapRowBlock rowblock) throws IOException {
+    return rowblock.copyFromChannel(channel, tableStats);
+  }
+
+  private boolean fetchNeeded = true;
+
+  @Override
+  public Tuple next() throws IOException {
+    if(eof) {
+      return null;
+    }
+
+    while(true) {
+      if (fetchNeeded) {
+        if (!next(tupleBuffer)) {
+          return null;
+        }
+        reader.reset();
+      }
+
+      fetchNeeded = !reader.next(unSafeTuple);
+
+      if (!fetchNeeded) {
+        recordCount++;
+        return unSafeTuple;
+      }
+    }
+  }
+
+  @Override
+  public void reset() throws IOException {
+    // reload initial buffer
+    seek(0);
+    eof = false;
+    reader.reset();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (tableStats != null) {
+      tableStats.setReadBytes(fileSize);
+      tableStats.setNumRows(recordCount);
+    }
+    tupleBuffer.release();
+    tupleBuffer = null;
+    reader = null;
+
+    IOUtils.cleanup(LOG, channel);
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return false;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public void setFilter(EvalNode filter) {
+
+  }
+
+  @Override
+  public boolean isSplittable(){
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    if(!inited) return 0.0f;
+
+    try {
+      tableStats.setNumRows(recordCount);
+      long filePos = 0;
+      if (channel != null) {
+        filePos = channel.position();
+        tableStats.setReadBytes(filePos);
+      }
+
+      if(eof || channel == null) {
+        tableStats.setReadBytes(fileSize);
+        return 1.0f;
+      }
+
+      if (filePos == 0) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, ((float)filePos / (float)fileSize));
+      }
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      return 0.0f;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
new file mode 100644
index 0000000..bb81d6e
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rawfile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.BaseTupleBuilder;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.UnSafeTuple;
+import org.apache.tajo.unit.StorageUnit;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class DirectRawFileWriter extends FileAppender {
+  public static final String FILE_EXTENSION = "draw";
+  private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class);
+
+  private FileChannel channel;
+  private RandomAccessFile randomAccessFile;
+  private FSDataOutputStream fos;
+  private TajoDataTypes.DataType[] columnTypes;
+  private boolean isLocal;
+  private long pos;
+
+  private TableStatistics stats;
+
+  private BaseTupleBuilder builder;
+
+  public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId,
+                             final Schema schema, final TableMeta meta, final 
Path path) throws IOException {
+    super(conf, taskAttemptId, schema, meta, path);
+  }
+
+  @Override
+  public void init() throws IOException {
+    File file;
+    FileSystem fs = path.getFileSystem(conf);
+
+    if (fs instanceof LocalFileSystem) {
+      try {
+        if (path.toUri().getScheme() != null) {
+          file = new File(path.toUri());
+        } else {
+          file = new File(path.toString());
+        }
+      } catch (IllegalArgumentException iae) {
+        throw new IOException(iae);
+      }
+
+      randomAccessFile = new RandomAccessFile(file, "rw");
+      channel = randomAccessFile.getChannel();
+      isLocal = true;
+    } else {
+      fos = fs.create(path, true);
+      isLocal = false;
+    }
+
+    pos = 0;
+    columnTypes = new TajoDataTypes.DataType[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      columnTypes[i] = schema.getColumn(i).getDataType();
+    }
+
+    if (enabledStats) {
+      this.stats = new TableStatistics(this.schema);
+    }
+
+    builder = new BaseTupleBuilder(schema);
+
+    super.init();
+  }
+
+  @Override
+  public long getOffset() throws IOException {
+    return pos;
+  }
+
+  private long getFilePosition() throws IOException {
+    if (isLocal) {
+      return channel.position();
+    } else {
+      return fos.getPos();
+    }
+  }
+
+  public void writeRowBlock(OffHeapRowBlock rowBlock) throws IOException {
+    write(rowBlock.nioBuffer());
+    if (enabledStats) {
+      stats.incrementRows(rowBlock.rows());
+    }
+
+    pos = getFilePosition();
+  }
+
+  private ByteBuffer buffer;
+  private void ensureSize(int size) throws IOException {
+    if (buffer.remaining() < size) {
+
+      buffer.limit(buffer.position());
+      buffer.flip();
+      write(buffer);
+
+      buffer.clear();
+    }
+  }
+
+  private void write(ByteBuffer buffer) throws IOException {
+    if(isLocal) {
+      channel.write(buffer);
+    } else {
+      byte[] bytes = new byte[buffer.remaining()];
+      buffer.get(bytes);
+      fos.write(bytes);
+    }
+  }
+
+  @Override
+  public void addTuple(Tuple t) throws IOException {
+    if (enabledStats) {
+      for (int i = 0; i < schema.size(); i++) {
+        stats.analyzeField(i, t);
+      }
+    }
+
+    if (buffer == null) {
+      buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB);
+    }
+
+    UnSafeTuple unSafeTuple;
+
+    if (!(t instanceof UnSafeTuple)) {
+      RowStoreUtil.convert(t, builder);
+      unSafeTuple = builder.buildToZeroCopyTuple();
+    } else {
+      unSafeTuple = (UnSafeTuple) t;
+    }
+
+    ByteBuffer bb = unSafeTuple.nioBuffer();
+    ensureSize(bb.limit());
+    buffer.put(bb);
+
+    pos = getFilePosition() + (buffer.limit() - buffer.remaining());
+
+    if (enabledStats) {
+      stats.incrementRow();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (buffer != null) {
+      buffer.limit(buffer.position());
+      buffer.flip();
+      write(buffer);
+      buffer.clear();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    if (enabledStats) {
+      stats.setNumBytes(getOffset());
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + 
path);
+    }
+
+    IOUtils.cleanup(LOG, channel, randomAccessFile, fos);
+  }
+
+  @Override
+  public TableStats getStats() {
+    if (enabledStats) {
+      stats.setNumBytes(pos);
+      return stats.getTableStat();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index b73f96b..0443308 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -112,7 +112,7 @@ public class DelimitedLineReader implements Closeable {
         FileChannel channel = inputStream.getChannel();
         channel.position(startOffset);
         is = inputStream;
-        lineReader = new ByteBufLineReader(new LocalFileInputChannel(channel),
+        lineReader = new ByteBufLineReader(new 
LocalFileInputChannel(inputStream),
             BufferPool.directBuffer((int) Math.min(bufferSize, end)));
       } else {
         fis = fs.open(fragment.getPath());

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
index d127a9e..b6f65df 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestByteBufLineReader.java
@@ -34,7 +34,6 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
 import java.util.UUID;
 
@@ -144,9 +143,7 @@ public class TestByteBufLineReader {
     assertTrue(file.exists());
 
     FileInputStream inputStream = new FileInputStream(file);
-    FileChannel channel = inputStream.getChannel();
-
-    ByteBufLineReader lineReader = new ByteBufLineReader(new 
LocalFileInputChannel(channel));
+    ByteBufLineReader lineReader = new ByteBufLineReader(new 
LocalFileInputChannel(inputStream));
 
     assertEquals(LINE, lineReader.readLine());
     lineReader.seek(0);
@@ -154,7 +151,6 @@ public class TestByteBufLineReader {
     assertNull(lineReader.readLine());
 
     lineReader.close();
-    channel.close();
     inputStream.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aa8969ac/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
new file mode 100644
index 0000000..46c0d6e
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
@@ -0,0 +1,480 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.raw;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tajo.BuiltinStorages;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rawfile.DirectRawFileScanner;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.RowWriter;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.ProtoUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestDirectRawFile {
+  private static final Log LOG = LogFactory.getLog(TestDirectRawFile.class);
+  public static String UNICODE_FIELD_PREFIX = "abc_가나다_";
+  public static Schema schema;
+
+  private static String TEST_PATH = "target/test-data/TestDirectRawFile";
+  private static MiniDFSCluster cluster;
+  private static FileSystem dfs;
+  private static FileSystem localFs;
+
+  private TajoConf tajoConf;
+  private Path testDir;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() throws IOException {
+    return Arrays.asList(new Object[][]{
+        {false},
+        {true}
+    });
+  }
+
+
+  public TestDirectRawFile(boolean isLocal) throws IOException {
+    FileSystem fs;
+    if (isLocal) {
+      fs = localFs;
+    } else {
+      fs = dfs;
+    }
+
+    this.tajoConf = new TajoConf(fs.getConf());
+    this.testDir = getTestDir(fs, TEST_PATH);
+  }
+
+  @BeforeClass
+  public static void setUpClass() throws IOException, InterruptedException {
+    final Configuration conf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
+
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new 
HdfsConfiguration(conf));
+    builder.numDataNodes(1);
+    builder.format(true);
+    builder.manageNameDfsDirs(true);
+    builder.manageDataDfsDirs(true);
+    builder.waitSafeMode(true);
+    cluster = builder.build();
+
+    cluster.waitClusterUp();
+    dfs = cluster.getFileSystem();
+    localFs = FileSystem.getLocal(new TajoConf());
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws InterruptedException {
+    cluster.shutdown(true);
+  }
+
+  public Path getTestDir(FileSystem fs, String dir) throws IOException {
+    Path path = new Path(dir);
+    if(fs.exists(path))
+      fs.delete(path, true);
+
+    fs.mkdirs(path);
+
+    return fs.makeQualified(path);
+  }
+
+  static {
+    schema = new Schema();
+    schema.addColumn("col0", TajoDataTypes.Type.BOOLEAN);
+    schema.addColumn("col1", TajoDataTypes.Type.INT2);
+    schema.addColumn("col2", TajoDataTypes.Type.INT4);
+    schema.addColumn("col3", TajoDataTypes.Type.INT8);
+    schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
+    schema.addColumn("col6", TajoDataTypes.Type.TEXT);
+    schema.addColumn("col7", TajoDataTypes.Type.TIMESTAMP);
+    schema.addColumn("col8", TajoDataTypes.Type.DATE);
+    schema.addColumn("col9", TajoDataTypes.Type.TIME);
+    schema.addColumn("col10", TajoDataTypes.Type.INTERVAL);
+    schema.addColumn("col11", TajoDataTypes.Type.INET4);
+    schema.addColumn("col12",
+        CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, 
PrimitiveProtos.StringProto.class.getName()));
+  }
+
+  public FileStatus writeRowBlock(TajoConf conf, TableMeta meta, 
OffHeapRowBlock rowBlock, Path outputFile)
+      throws IOException {
+    DirectRawFileWriter writer = new DirectRawFileWriter(conf, null, schema, 
meta, outputFile);
+    writer.init();
+    writer.writeRowBlock(rowBlock);
+    writer.close();
+
+    FileStatus status = 
outputFile.getFileSystem(conf).getFileStatus(outputFile);
+    assertTrue(status.getLen() > 0);
+    LOG.info("Written file size: " + 
FileUtil.humanReadableByteCount(status.getLen(), false));
+    return status;
+  }
+
+  public FileStatus writeRowBlock(TajoConf conf, TableMeta meta, 
OffHeapRowBlock rowBlock) throws IOException {
+    Path outputDir = new Path(testDir, UUID.randomUUID() + "");
+    outputDir.getFileSystem(conf).mkdirs(outputDir);
+    Path outputFile = new Path(outputDir, "output.draw");
+    return writeRowBlock(conf, meta, rowBlock, outputFile);
+  }
+
+  @Test
+  public void testRWForAllTypesWithNextTuple() throws IOException {
+    int rowNum = 10000;
+
+    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+
+    TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW);
+    FileStatus outputFile = writeRowBlock(tajoConf, meta, rowBlock);
+    rowBlock.release();
+
+    FileFragment fragment =
+        new FileFragment("testRWForAllTypesWithNextTuple", 
outputFile.getPath(), 0, outputFile.getLen());
+    DirectRawFileScanner reader = new DirectRawFileScanner(tajoConf, schema, 
meta, fragment);
+    reader.init();
+
+    long readStart = System.currentTimeMillis();
+    int j = 0;
+    Tuple tuple;
+    while ((tuple = reader.next()) != null) {
+      validateTupleResult(j, tuple);
+      j++;
+    }
+
+    LOG.info("Total read rows: " + j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+    reader.close();
+    assertEquals(rowNum, j);
+  }
+
+  @Test
+  public void testRepeatedScan() throws IOException {
+    int rowNum = 2;
+
+    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+    TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW);
+    FileStatus outputFile = writeRowBlock(tajoConf, meta, rowBlock);
+
+    rowBlock.release();
+
+    FileFragment fragment =
+        new FileFragment("testRepeatedScan", outputFile.getPath(), 0, 
outputFile.getLen());
+    DirectRawFileScanner reader = new DirectRawFileScanner(tajoConf, schema, 
meta, fragment);
+    reader.init();
+
+    int j = 0;
+    while (reader.next() != null) {
+      j++;
+    }
+    assertEquals(rowNum, j);
+
+    for (int i = 0; i < 5; i++) {
+      assertNull(reader.next());
+    }
+
+    reader.close();
+  }
+
+  @Test
+  public void testReset() throws IOException {
+    int rowNum = 2;
+
+    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+
+    TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW);
+    FileStatus outputFile = writeRowBlock(tajoConf, meta, rowBlock);
+    rowBlock.release();
+
+    FileFragment fragment =
+        new FileFragment("testReset", outputFile.getPath(), 0, 
outputFile.getLen());
+    DirectRawFileScanner reader = new DirectRawFileScanner(tajoConf, schema, 
meta, fragment);
+    reader.init();
+
+    int j = 0;
+    while (reader.next() != null) {
+      j++;
+    }
+    assertEquals(rowNum, j);
+
+    for (int i = 0; i < 5; i++) {
+      assertNull(reader.next());
+    }
+
+    reader.reset();
+
+    j = 0;
+    while (reader.next() != null) {
+      j++;
+    }
+    assertEquals(rowNum, j);
+
+    for (int i = 0; i < 5; i++) {
+      assertNull(reader.next());
+    }
+    reader.close();
+  }
+
+  public static OffHeapRowBlock createRowBlock(int rowNum) {
+    long allocateStart = System.currentTimeMillis();
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
+    long allocatedEnd = System.currentTimeMillis();
+    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes 
allocated "
+        + (allocatedEnd - allocateStart) + " msec");
+
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRow(i, rowBlock.getWriter());
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing takes " + (writeEnd - writeStart) + " msec");
+
+    return rowBlock;
+  }
+
+  public static void fillRow(int i, RowWriter builder) {
+    builder.startRow();
+    builder.putBool(i % 1 == 0 ? true : false); // 0
+    builder.putInt2((short) 1);                 // 1
+    builder.putInt4(i);                         // 2
+    builder.putInt8(i);                         // 3
+    builder.putFloat4(i);                       // 4
+    builder.putFloat8(i);                       // 5
+    builder.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
+    builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 
08:48:00").asInt8() + i); // 7
+    builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+    builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+    builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+    builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 
11
+    builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); 
// 12
+    builder.endRow();
+  }
+
+  public static void validateTupleResult(int j, Tuple t) {
+    assertTrue((j % 1 == 0) == t.getBool(0));
+    assertTrue(1 == t.getInt2(1));
+    assertEquals(j, t.getInt4(2));
+    assertEquals(j, t.getInt8(3));
+    assertTrue(j == t.getFloat4(4));
+    assertTrue(j == t.getFloat8(5));
+    assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6));
+    assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() 
+ (long) j, t.getInt8(7));
+    assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, 
t.getInt4(8));
+    assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, 
t.getInt8(9));
+    assertEquals(DatumFactory.createInterval((j + 1) + " hours"), 
t.getInterval(10));
+    assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, 
t.getInt4(11));
+    assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), 
t.getProtobufDatum(12));
+  }
+
+  public static void fillRowBlockWithNull(int i, RowWriter writer) {
+    writer.startRow();
+
+    if (i == 0) {
+      writer.skipField();
+    } else {
+      writer.putBool(i % 1 == 0 ? true : false); // 0
+    }
+    if (i % 1 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt2((short) 1);                 // 1
+    }
+
+    if (i % 2 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt4(i);                         // 2
+    }
+
+    if (i % 3 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt8(i);                         // 3
+    }
+
+    if (i % 4 == 0) {
+      writer.skipField();
+    } else {
+      writer.putFloat4(i);                       // 4
+    }
+
+    if (i % 5 == 0) {
+      writer.skipField();
+    } else {
+      writer.putFloat8(i);                       // 5
+    }
+
+    if (i % 6 == 0) {
+      writer.skipField();
+    } else {
+      writer.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
+    }
+
+    if (i % 7 == 0) {
+      writer.skipField();
+    } else {
+      writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 
08:48:00").asInt8() + i); // 7
+    }
+
+    if (i % 8 == 0) {
+      writer.skipField();
+    } else {
+      writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+    }
+
+    if (i % 9 == 0) {
+      writer.skipField();
+    } else {
+      writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+    }
+
+    if (i % 10 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 
10
+    }
+
+    if (i % 11 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); 
// 11
+    }
+
+    if (i % 12 == 0) {
+      writer.skipField();
+    } else {
+      writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + 
""))); // 12
+    }
+
+    writer.endRow();
+  }
+
+  public static void validateNullity(int j, Tuple tuple) {
+    if (j == 0) {
+      tuple.isBlankOrNull(0);
+    } else {
+      assertTrue((j % 1 == 0) == tuple.getBool(0));
+    }
+
+    if (j % 1 == 0) {
+      tuple.isBlankOrNull(1);
+    } else {
+      assertTrue(1 == tuple.getInt2(1));
+    }
+
+    if (j % 2 == 0) {
+      tuple.isBlankOrNull(2);
+    } else {
+      assertEquals(j, tuple.getInt4(2));
+    }
+
+    if (j % 3 == 0) {
+      tuple.isBlankOrNull(3);
+    } else {
+      assertEquals(j, tuple.getInt8(3));
+    }
+
+    if (j % 4 == 0) {
+      tuple.isBlankOrNull(4);
+    } else {
+      assertTrue(j == tuple.getFloat4(4));
+    }
+
+    if (j % 5 == 0) {
+      tuple.isBlankOrNull(5);
+    } else {
+      assertTrue(j == tuple.getFloat8(5));
+    }
+
+    if (j % 6 == 0) {
+      tuple.isBlankOrNull(6);
+    } else {
+      assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6));
+    }
+
+    if (j % 7 == 0) {
+      tuple.isBlankOrNull(7);
+    } else {
+      assertEquals(DatumFactory.createTimestamp("2014-04-16 
08:48:00").asInt8() + (long) j, tuple.getInt8(7));
+    }
+
+    if (j % 8 == 0) {
+      tuple.isBlankOrNull(8);
+    } else {
+      assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, 
tuple.getInt4(8));
+    }
+
+    if (j % 9 == 0) {
+      tuple.isBlankOrNull(9);
+    } else {
+      assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, 
tuple.getInt8(9));
+    }
+
+    if (j % 10 == 0) {
+      tuple.isBlankOrNull(10);
+    } else {
+      assertEquals(DatumFactory.createInterval((j + 1) + " hours"), 
tuple.getInterval(10));
+    }
+
+    if (j % 11 == 0) {
+      tuple.isBlankOrNull(11);
+    } else {
+      assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, 
tuple.getInt4(11));
+    }
+
+    if (j % 12 == 0) {
+      tuple.isBlankOrNull(12);
+    } else {
+      assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), 
tuple.getProtobufDatum(12));
+    }
+  }
+}

Reply via email to