Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.2 3f994bf2e -> a9e8cbc4f


http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java
index e520082..fe1bcf6 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.storage.thirdparty.orc;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 import com.google.protobuf.ByteString;
@@ -30,23 +29,20 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.orc.CompressionCodec.Modifier;
-import org.apache.tajo.storage.thirdparty.orc.OrcProto.RowIndexEntry;
-import org.apache.tajo.storage.thirdparty.orc.OrcProto.StripeStatistics;
-import org.apache.tajo.storage.thirdparty.orc.OrcProto.Type;
-import org.apache.tajo.storage.thirdparty.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.Text;
+import org.apache.orc.*;
+import org.apache.orc.CompressionCodec.Modifier;
+import org.apache.orc.OrcProto.RowIndexEntry;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.impl.*;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.Inet4Datum;
+import org.apache.tajo.datum.Int4Datum;
+import org.apache.tajo.datum.Int8Datum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.thirdparty.orc.OrcFile.*;
+import org.apache.tajo.util.datetime.DateTimeConstants;
 import org.apache.tajo.util.datetime.DateTimeUtil;
 
 import java.io.IOException;
@@ -96,10 +92,11 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
   private final boolean addBlockPadding;
   private final int bufferSize;
   private final long blockSize;
-  private final float paddingTolerance;
+  private final double paddingTolerance;
+  private final TypeDescription schema;
+
   // the streams that make up the current stripe
-  private final Map<StreamName, BufferedStream> streams =
-    new TreeMap<StreamName, BufferedStream>();
+  private final Map<StreamName, BufferedStream> streams = new TreeMap<>();
 
   private FSDataOutputStream rawWriter = null;
   // the compressed metadata information outStream
@@ -113,47 +110,32 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
   private long rawDataSize = 0;
   private int rowsInIndex = 0;
   private int stripesAtLastFlush = -1;
-  private final List<OrcProto.StripeInformation> stripes =
-    new ArrayList<OrcProto.StripeInformation>();
-  private final Map<String, ByteString> userMetadata =
-    new TreeMap<String, ByteString>();
+  private final List<OrcProto.StripeInformation> stripes = new ArrayList<>();
+  private final Map<String, ByteString> userMetadata = new TreeMap<>();
+  private final StreamFactory streamFactory = new StreamFactory();
   private final TreeWriter treeWriter;
   private final boolean buildIndex;
   private final MemoryManager memoryManager;
-  private final OrcFile.Version version;
+  private final Version version;
   private final Configuration conf;
-  private final OrcFile.WriterCallback callback;
-  private final OrcFile.WriterContext callbackContext;
-  private final OrcFile.EncodingStrategy encodingStrategy;
-  private final OrcFile.CompressionStrategy compressionStrategy;
+  private final WriterCallback callback;
+  private final WriterContext callbackContext;
+  private final EncodingStrategy encodingStrategy;
+  private final CompressionStrategy compressionStrategy;
   private final boolean[] bloomFilterColumns;
   private final double bloomFilterFpp;
   private boolean writeTimeZone;
   private TimeZone timeZone;
 
-  WriterImpl(FileSystem fs,
-      Path path,
-      Configuration conf,
-      ObjectInspector inspector,
-      long stripeSize,
-      CompressionKind compress,
-      int bufferSize,
-      int rowIndexStride,
-      MemoryManager memoryManager,
-      boolean addBlockPadding,
-      OrcFile.Version version,
-      OrcFile.WriterCallback callback,
-      OrcFile.EncodingStrategy encodingStrategy,
-      OrcFile.CompressionStrategy compressionStrategy,
-      float paddingTolerance,
-      long blockSizeValue,
-      String bloomFilterColumnNames,
-      double bloomFilterFpp,
-      TimeZone timeZone) throws IOException {
+  public WriterImpl(FileSystem fs,
+                    Path path,
+                    OrcFile.WriterOptions opts,
+                    TimeZone timeZone) throws IOException {
     this.fs = fs;
     this.path = path;
-    this.conf = conf;
-    this.callback = callback;
+    this.conf = opts.getConfiguration();
+    this.callback = opts.getCallback();
+    this.schema = opts.getSchema();
     if (callback != null) {
       callbackContext = new OrcFile.WriterContext(){
 
@@ -165,100 +147,60 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     } else {
       callbackContext = null;
     }
-    this.adjustedStripeSize = stripeSize;
-    this.defaultStripeSize = stripeSize;
-    this.version = version;
-    this.encodingStrategy = encodingStrategy;
-    this.compressionStrategy = compressionStrategy;
-    this.addBlockPadding = addBlockPadding;
-    this.blockSize = blockSizeValue;
-    this.paddingTolerance = paddingTolerance;
-    this.compress = compress;
-    this.rowIndexStride = rowIndexStride;
-    this.memoryManager = memoryManager;
-    this.timeZone = timeZone;
+    this.adjustedStripeSize = opts.getStripeSize();
+    this.defaultStripeSize = opts.getStripeSize();
+    this.version = opts.getVersion();
+    this.encodingStrategy = opts.getEncodingStrategy();
+    this.compressionStrategy = opts.getCompressionStrategy();
+    this.addBlockPadding = opts.getBlockPadding();
+    this.blockSize = opts.getBlockSize();
+    this.paddingTolerance = opts.getPaddingTolerance();
+    this.compress = opts.getCompress();
+    this.rowIndexStride = opts.getRowIndexStride();
+    this.memoryManager = opts.getMemoryManager();
     buildIndex = rowIndexStride > 0;
     codec = createCodec(compress);
-    String allColumns = conf.get(IOConstants.COLUMNS);
-    if (allColumns == null) {
-      allColumns = getColumnNamesFromInspector(inspector);
-    }
-    this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize);
+    int numColumns = schema.getMaximumId() + 1;
+    this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
+        numColumns, opts.getBufferSize());
     if (version == OrcFile.Version.V_0_11) {
       /* do not write bloom filters for ORC v11 */
-      this.bloomFilterColumns =
-          OrcUtils.includeColumns(null, allColumns, inspector);
+      this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
     } else {
       this.bloomFilterColumns =
-          OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, 
inspector);
+          OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
     }
-    this.bloomFilterFpp = bloomFilterFpp;
-    treeWriter = createTreeWriter(inspector, new StreamFactory(), false);
+    this.bloomFilterFpp = opts.getBloomFilterFpp();
+    this.timeZone = timeZone;
+    treeWriter = createTreeWriter(schema, streamFactory, false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
           MIN_ROW_INDEX_STRIDE);
     }
 
     // ensure that we are able to handle callbacks before we register ourselves
-    memoryManager.addWriter(path, stripeSize, this);
-  }
-
-  private String getColumnNamesFromInspector(ObjectInspector inspector) {
-    List<String> fieldNames = Lists.newArrayList();
-    Joiner joiner = Joiner.on(",");
-    if (inspector instanceof StructObjectInspector) {
-      StructObjectInspector soi = (StructObjectInspector) inspector;
-      List<? extends StructField> fields = soi.getAllStructFieldRefs();
-      for(StructField sf : fields) {
-        fieldNames.add(sf.getFieldName());
-      }
-    }
-    return joiner.join(fieldNames);
+    memoryManager.addWriter(path, opts.getStripeSize(), this);
   }
 
   @VisibleForTesting
-  int getEstimatedBufferSize(int bs) {
-      return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs);
-  }
-
-  int getEstimatedBufferSize(String colNames, int bs) {
-    long availableMem = getMemoryAvailableForORC();
-    if (colNames != null) {
-      final int numCols = colNames.split(",").length;
-      if (numCols > COLUMN_COUNT_THRESHOLD) {
-        // In BufferedStream, there are 3 outstream buffers (compressed,
-        // uncompressed and overflow) and list of previously compressed 
buffers.
-        // Since overflow buffer is rarely used, lets consider only 2 
allocation.
-        // Also, initially, the list of compression buffers will be empty.
-        final int outStreamBuffers = codec == null ? 1 : 2;
-
-        // max possible streams per column is 5. For string columns, there is
-        // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams.
-        final int maxStreams = 5;
-
-        // Lets assume 10% memory for holding dictionary in memory and other
-        // object allocations
-        final long miscAllocation = (long) (0.1f * availableMem);
-
-        // compute the available memory
-        final long remainingMem = availableMem - miscAllocation;
-
-        int estBufferSize = (int) (remainingMem /
-            (maxStreams * outStreamBuffers * numCols));
-        estBufferSize = getClosestBufferSize(estBufferSize, bs);
-        if (estBufferSize > bs) {
-          estBufferSize = bs;
-        }
-
-        LOG.info("WIDE TABLE - Number of columns: " + numCols +
-            " Chosen compression buffer size: " + estBufferSize);
-        return estBufferSize;
-      }
+  public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+                                           int bs) {
+    // The worst case is that there are 2 big streams per a column and
+    // we want to guarantee that each stream gets ~10 buffers.
+    // This keeps buffers small enough that we don't get really small stripe
+    // sizes.
+    int estBufferSize = (int) (stripeSize / (20 * numColumns));
+    estBufferSize = getClosestBufferSize(estBufferSize);
+    if (estBufferSize > bs) {
+      estBufferSize = bs;
+    } else {
+      LOG.info("WIDE TABLE - Number of columns: " + numColumns +
+          " Chosen compression buffer size: " + estBufferSize);
     }
-    return bs;
+    return estBufferSize;
   }
 
-  private int getClosestBufferSize(int estBufferSize, int bs) {
+  private static int getClosestBufferSize(int estBufferSize) {
     final int kb4 = 4 * 1024;
     final int kb8 = 8 * 1024;
     final int kb16 = 16 * 1024;
@@ -618,8 +560,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
    */
   private abstract static class TreeWriter {
     protected final int id;
-    protected final ObjectInspector inspector;
-    private final BitFieldWriter isPresent;
+    protected final BitFieldWriter isPresent;
     private final boolean isCompressed;
     protected final ColumnStatisticsImpl indexStatistics;
     protected final ColumnStatisticsImpl stripeColStatistics;
@@ -636,24 +577,24 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private final OrcProto.BloomFilter.Builder bloomFilterEntry;
     private boolean foundNulls;
     private OutStream isPresentOutStream;
-    private final List<StripeStatistics.Builder> stripeStatsBuilders;
+    private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
     private final StreamFactory streamFactory;
 
     /**
      * Create a tree writer.
      * @param columnId the column id of the column to write
-     * @param inspector the object inspector to use
+     * @param schema the row schema
      * @param streamFactory limited access to the Writer's data.
      * @param nullable can the value be null?
      * @throws IOException
      */
-    TreeWriter(int columnId, ObjectInspector inspector,
+    TreeWriter(int columnId,
+               TypeDescription schema,
                StreamFactory streamFactory,
                boolean nullable) throws IOException {
       this.streamFactory = streamFactory;
       this.isCompressed = streamFactory.isCompressed();
       this.id = columnId;
-      this.inspector = inspector;
       if (nullable) {
         isPresentOutStream = streamFactory.createStream(id,
             OrcProto.Stream.Kind.PRESENT);
@@ -663,9 +604,9 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
       }
       this.foundNulls = false;
       createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
-      indexStatistics = ColumnStatisticsImpl.create(inspector);
-      stripeColStatistics = ColumnStatisticsImpl.create(inspector);
-      fileStatistics = ColumnStatisticsImpl.create(inspector);
+      indexStatistics = ColumnStatisticsImpl.create(schema);
+      stripeColStatistics = ColumnStatisticsImpl.create(schema);
+      fileStatistics = ColumnStatisticsImpl.create(schema);
       childrenWriters = new TreeWriter[0];
       rowIndex = OrcProto.RowIndex.newBuilder();
       rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
@@ -914,10 +855,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private final BitFieldWriter writer;
 
     BooleanTreeWriter(int columnId,
-                      ObjectInspector inspector,
+                      TypeDescription schema,
                       StreamFactory writer,
                       boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, schema, writer, nullable);
       PositionedOutputStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.writer = new BitFieldWriter(out, 1);
@@ -929,7 +870,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
       super.write(datum);
       if (datum != null && datum.isNotNull()) {
         boolean val = datum.asBool();
-        indexStatistics.updateBoolean(val);
+        indexStatistics.updateBoolean(val, 1);
         writer.write(val ? 1 : 0);
       }
     }
@@ -953,10 +894,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private final RunLengthByteWriter writer;
 
     ByteTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
       this.writer = new RunLengthByteWriter(writer.createStream(id,
           OrcProto.Stream.Kind.DATA));
       recordPosition(rowIndexPosition);
@@ -967,7 +908,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
       super.write(datum);
       if (datum != null && datum.isNotNull()) {
         byte val = datum.asByte();
-        indexStatistics.updateInteger(val);
+        indexStatistics.updateInteger(val, 1);
         if (createBloomFilter) {
           bloomFilter.addLong(val);
         }
@@ -995,10 +936,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private boolean isDirectV2 = true;
 
     IntegerTreeWriter(int columnId,
-                      ObjectInspector inspector,
+                      TypeDescription schema,
                       StreamFactory writer,
                       boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, schema, writer, nullable);
       OutStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
@@ -1028,7 +969,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
         } else {
           val = datum.asInt2();
         }
-        indexStatistics.updateInteger(val);
+        indexStatistics.updateInteger(val, 1);
         if (createBloomFilter) {
           // integers are converted to longs in column statistics and during 
SARG evaluation
           bloomFilter.addLong(val);
@@ -1057,10 +998,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private final SerializationUtils utils;
 
     FloatTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+                    TypeDescription schema,
+                    StreamFactory writer,
+                    boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.utils = new SerializationUtils();
@@ -1101,10 +1042,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private final SerializationUtils utils;
 
     DoubleTreeWriter(int columnId,
-                    ObjectInspector inspector,
-                    StreamFactory writer,
-                    boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.utils = new SerializationUtils();
@@ -1139,33 +1080,33 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     }
   }
 
-  private static class StringTreeWriter extends TreeWriter {
+  private static abstract class StringBaseTreeWriter extends TreeWriter {
     private static final int INITIAL_DICTIONARY_SIZE = 4096;
     private final OutStream stringOutput;
     private final IntegerWriter lengthOutput;
     private final IntegerWriter rowOutput;
-    private final StringRedBlackTree dictionary =
+    protected final StringRedBlackTree dictionary =
         new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
-    private final DynamicIntArray rows = new DynamicIntArray();
-    private final PositionedOutputStream directStreamOutput;
-    private final IntegerWriter directLengthOutput;
+    protected final DynamicIntArray rows = new DynamicIntArray();
+    protected final PositionedOutputStream directStreamOutput;
+    protected final IntegerWriter directLengthOutput;
     private final List<RowIndexEntry> savedRowIndex =
         new ArrayList<RowIndexEntry>();
     private final boolean buildIndex;
-    private final List<Long> rowIndexValueCount = new ArrayList<Long>();
+    private final List<Long> rowIndexValueCount = new ArrayList<>();
     // If the number of keys in a dictionary is greater than this fraction of
     //the total number of non-null rows, turn off dictionary encoding
-    private final float dictionaryKeySizeThreshold;
-    private boolean useDictionaryEncoding = true;
+    private final double dictionaryKeySizeThreshold;
+    protected boolean useDictionaryEncoding = true;
     private boolean isDirectV2 = true;
     private boolean doneDictionaryCheck;
-    private final boolean strideDictionaryCheck;
+    protected final boolean strideDictionaryCheck;
 
-    StringTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+    StringBaseTreeWriter(int columnId,
+                         TypeDescription schema,
+                         StreamFactory writer,
+                         boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
       stringOutput = writer.createStream(id,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
@@ -1179,33 +1120,14 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
       directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
       directLengthOutput = createIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      dictionaryKeySizeThreshold = writer.getConfiguration().getFloat(
-          OrcConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
-          
OrcConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.defaultFloatVal);
-      strideDictionaryCheck = writer.getConfiguration().getBoolean(
-          OrcConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.varname,
-          
OrcConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.defaultBoolVal);
+      Configuration conf = writer.getConfiguration();
+      dictionaryKeySizeThreshold =
+          org.apache.orc.OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+      strideDictionaryCheck =
+          
org.apache.orc.OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
       doneDictionaryCheck = false;
     }
 
-    @Override
-    void write(Datum datum) throws IOException {
-      super.write(datum);
-      if (datum != null && datum.isNotNull()) {
-        if (useDictionaryEncoding || !strideDictionaryCheck) {
-          rows.add(dictionary.add(datum.toString()));
-        } else {
-          // write data and length
-          directStreamOutput.write(datum.asByteArray(), 0, datum.size());
-          directLengthOutput.write(datum.size());
-        }
-        indexStatistics.updateString(datum.toString());
-        if (createBloomFilter) {
-          bloomFilter.addBytes(datum.asByteArray(), datum.size());
-        }
-      }
-    }
-
     private boolean checkDictionaryEncoding() {
       if (!doneDictionaryCheck) {
         // Set the flag indicating whether or not to use dictionary encoding
@@ -1271,7 +1193,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           private int currentId = 0;
           @Override
           public void visit(StringRedBlackTree.VisitorContext context
-                           ) throws IOException {
+          ) throws IOException {
             context.writeBytes(stringOutput);
             lengthOutput.write(context.getLength());
             dumpOrder[context.getOriginalPosition()] = currentId++;
@@ -1385,29 +1307,76 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     }
   }
 
+  private static class StringTreeWriter extends StringBaseTreeWriter {
+    StringTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+    }
+
+    @Override
+    void write(Datum datum) throws IOException {
+      super.write(datum);
+      if (datum != null && datum.isNotNull()) {
+        if (useDictionaryEncoding || !strideDictionaryCheck) {
+          rows.add(dictionary.add(datum.toString()));
+        } else {
+          // write data and length
+          directStreamOutput.write(datum.asByteArray(), 0, datum.size());
+          directLengthOutput.write(datum.size());
+        }
+        byte[] buf = datum.asByteArray();
+        indexStatistics.updateString(buf, 0, buf.length, 1);
+        if (createBloomFilter) {
+          bloomFilter.addBytes(buf, 0, buf.length);
+        }
+      }
+    }
+  }
+
   /**
    * Under the covers, char is written to ORC the same way as string.
    */
   private static class CharTreeWriter extends StringTreeWriter {
+    private final int itemLength;
+    private final byte[] padding;
 
     CharTreeWriter(int columnId,
-        ObjectInspector inspector,
-        StreamFactory writer,
-        boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      itemLength = schema.getMaxLength();
+      padding = new byte[itemLength];
     }
-  }
 
-  /**
-   * Under the covers, varchar is written to ORC the same way as string.
-   */
-  private static class VarcharTreeWriter extends StringTreeWriter {
+    @Override
+    void write(Datum datum) throws IOException {
+      super.write(datum);
+      if (datum != null && datum.isNotNull()) {
+        byte[] ptr;
+        byte[] buf = datum.asByteArray();
+        if (buf.length >= itemLength) {
+          ptr = buf;
+        } else {
+          ptr = padding;
+          System.arraycopy(buf, 0, ptr, 0, buf.length);
+          Arrays.fill(ptr, buf.length, itemLength, (byte) ' ');
+        }
+        if (useDictionaryEncoding || !strideDictionaryCheck) {
+          rows.add(dictionary.add(ptr, 0, itemLength));
+        } else {
+          // write data and length
+          directStreamOutput.write(ptr, 0, itemLength);
+          directLengthOutput.write(itemLength);
+        }
 
-    VarcharTreeWriter(int columnId,
-        ObjectInspector inspector,
-        StreamFactory writer,
-        boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+        indexStatistics.updateString(ptr, 0, ptr.length, 1);
+        if (createBloomFilter) {
+          bloomFilter.addBytes(ptr, 0, ptr.length);
+        }
+      }
     }
   }
 
@@ -1417,10 +1386,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private boolean isDirectV2 = true;
 
     BinaryTreeWriter(int columnId,
-                     ObjectInspector inspector,
+                     TypeDescription schema,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, schema, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
@@ -1443,11 +1412,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     void write(Datum datum) throws IOException {
       super.write(datum);
       if (datum != null && datum.isNotNull()) {
-        stream.write(datum.asByteArray(), 0, datum.size());
+        byte[] buf = datum.asByteArray();
+        stream.write(buf, 0, buf.length);
         length.write(datum.size());
-        indexStatistics.updateBinary(datum);
+        indexStatistics.updateBinary(buf, 0, buf.length, 1);
         if (createBloomFilter) {
-          bloomFilter.addBytes(datum.asByteArray(), datum.size());
+          bloomFilter.addBytes(buf, 0, buf.length);
         }
       }
     }
@@ -1469,7 +1439,6 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     }
   }
 
-  static final int MILLIS_PER_SECOND = 1000;
   static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
 
   private static class TimestampTreeWriter extends TreeWriter {
@@ -1480,10 +1449,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private TimeZone timeZone;
 
     TimestampTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+                        TypeDescription schema,
+                        StreamFactory writer,
+                        boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.seconds = createIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
@@ -1491,7 +1460,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
       // for unit tests to set different time zones
-      this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() 
/ MILLIS_PER_SECOND;
+      this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() 
/ DateTimeConstants.MSECS_PER_SEC;
       writer.useWriterTimeZone(true);
       timeZone = writer.getTimeZone();
     }
@@ -1517,7 +1486,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
 
         Timestamp val = new Timestamp(javaTimestamp);
         indexStatistics.updateTimestamp(val);
-        seconds.write((val.getTime() / MILLIS_PER_SECOND) - base_timestamp);
+        seconds.write((val.getTime() / DateTimeConstants.MSECS_PER_SEC) - 
base_timestamp);
         nanos.write(formatNanos(val.getNanos()));
         if (createBloomFilter) {
           bloomFilter.addLong(val.getTime());
@@ -1563,12 +1532,12 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     private final boolean isDirectV2;
 
     DateTreeWriter(int columnId,
-                   ObjectInspector inspector,
+                   TypeDescription schema,
                    StreamFactory writer,
                    boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, schema, writer, nullable);
       OutStream out = writer.createStream(id,
-        OrcProto.Stream.Kind.DATA);
+          OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.writer = createIntegerWriter(out, true, isDirectV2, writer);
       recordPosition(rowIndexPosition);
@@ -1614,19 +1583,17 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
   }
 
   private static class StructTreeWriter extends TreeWriter {
-    private final List<? extends StructField> fields;
     StructTreeWriter(int columnId,
-                     ObjectInspector inspector,
+                     TypeDescription schema,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
-      StructObjectInspector structObjectInspector =
-        (StructObjectInspector) inspector;
-      fields = structObjectInspector.getAllStructFieldRefs();
-      childrenWriters = new TreeWriter[fields.size()];
+      super(columnId, schema, writer, nullable);
+      List<TypeDescription> children = schema.getChildren();
+      childrenWriters = new TreeWriter[children.size()];
       for(int i=0; i < childrenWriters.length; ++i) {
         childrenWriters[i] = createTreeWriter(
-          fields.get(i).getFieldObjectInspector(), writer, true);
+            children.get(i), writer,
+            true);
       }
       recordPosition(rowIndexPosition);
     }
@@ -1638,9 +1605,8 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     void writeTuple(Tuple tuple) throws IOException {
       super.write(tuple);
       if (tuple != null) {
-        for(int i = 0; i < fields.size(); ++i) {
-          TreeWriter writer = childrenWriters[i];
-          writer.write(tuple.asDatum(i));
+        for(int i = 0; i < childrenWriters.length; ++i) {
+          childrenWriters[i].write(tuple.asDatum(i));
         }
       }
     }
@@ -1656,159 +1622,136 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     }
   }
 
-  private static TreeWriter createTreeWriter(ObjectInspector inspector,
+  private static TreeWriter createTreeWriter(TypeDescription schema,
                                              StreamFactory streamFactory,
                                              boolean nullable) throws 
IOException {
-    switch (inspector.getCategory()) {
-      case PRIMITIVE:
-        switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) 
{
-          case BOOLEAN:
-          case VOID:
-            return new BooleanTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case BYTE:
-            return new ByteTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case SHORT:
-          case INT:
-          case LONG:
-            return new IntegerTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case FLOAT:
-            return new FloatTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case DOUBLE:
-            return new DoubleTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case STRING:
-            return new StringTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case CHAR:
-            return new CharTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case VARCHAR:
-            return new VarcharTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case BINARY:
-            return new BinaryTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case TIMESTAMP:
-            return new TimestampTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case DATE:
-            return new DateTreeWriter(streamFactory.getNextColumnId(),
-              inspector, streamFactory, nullable);
-          default:
-            throw new IllegalArgumentException("Bad primitive category " +
-              ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
-        }
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case BYTE:
+        return new ByteTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case SHORT:
+      case INT:
+      case LONG:
+        return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case FLOAT:
+        return new FloatTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case DOUBLE:
+        return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case STRING:
+        return new StringTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case CHAR:
+        return new CharTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case BINARY:
+        return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case TIMESTAMP:
+        return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case DATE:
+        return new DateTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
       case STRUCT:
-        return new StructTreeWriter(streamFactory.getNextColumnId(), inspector,
-            streamFactory, nullable);
+        return new StructTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
       default:
         throw new IllegalArgumentException("Bad category: " +
-          inspector.getCategory());
+            schema.getCategory());
     }
   }
 
   private static void writeTypes(OrcProto.Footer.Builder builder,
-                                 TreeWriter treeWriter) {
+                                 TypeDescription schema) {
     OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    switch (treeWriter.inspector.getCategory()) {
-      case PRIMITIVE:
-        switch (((PrimitiveObjectInspector) treeWriter.inspector).
-                 getPrimitiveCategory()) {
-          case VOID:
-          case BOOLEAN:
-            type.setKind(OrcProto.Type.Kind.BOOLEAN);
-            break;
-          case BYTE:
-            type.setKind(OrcProto.Type.Kind.BYTE);
-            break;
-          case SHORT:
-            type.setKind(OrcProto.Type.Kind.SHORT);
-            break;
-          case INT:
-            type.setKind(OrcProto.Type.Kind.INT);
-            break;
-          case LONG:
-            type.setKind(OrcProto.Type.Kind.LONG);
-            break;
-          case FLOAT:
-            type.setKind(OrcProto.Type.Kind.FLOAT);
-            break;
-          case DOUBLE:
-            type.setKind(OrcProto.Type.Kind.DOUBLE);
-            break;
-          case STRING:
-            type.setKind(OrcProto.Type.Kind.STRING);
-            break;
-          case CHAR:
-            // The char length needs to be written to file and should be 
available
-            // from the object inspector
-            CharTypeInfo charTypeInfo = (CharTypeInfo) 
((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo();
-            type.setKind(Type.Kind.CHAR);
-            type.setMaximumLength(charTypeInfo.getLength());
-            break;
-          case VARCHAR:
-            // The varchar length needs to be written to file and should be 
available
-            // from the object inspector
-            VarcharTypeInfo typeInfo = (VarcharTypeInfo) 
((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo();
-            type.setKind(Type.Kind.VARCHAR);
-            type.setMaximumLength(typeInfo.getLength());
-            break;
-          case BINARY:
-            type.setKind(OrcProto.Type.Kind.BINARY);
-            break;
-          case TIMESTAMP:
-            type.setKind(OrcProto.Type.Kind.TIMESTAMP);
-            break;
-          case DATE:
-            type.setKind(OrcProto.Type.Kind.DATE);
-            break;
-          case DECIMAL:
-            DecimalTypeInfo decTypeInfo = 
(DecimalTypeInfo)((PrimitiveObjectInspector)treeWriter.inspector).getTypeInfo();
-            type.setKind(OrcProto.Type.Kind.DECIMAL);
-            type.setPrecision(decTypeInfo.precision());
-            type.setScale(decTypeInfo.scale());
-            break;
-          default:
-            throw new IllegalArgumentException("Unknown primitive category: " +
-              ((PrimitiveObjectInspector) treeWriter.inspector).
-                getPrimitiveCategory());
-        }
+    List<TypeDescription> children = schema.getChildren();
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        type.setKind(OrcProto.Type.Kind.BOOLEAN);
+        break;
+      case BYTE:
+        type.setKind(OrcProto.Type.Kind.BYTE);
+        break;
+      case SHORT:
+        type.setKind(OrcProto.Type.Kind.SHORT);
+        break;
+      case INT:
+        type.setKind(OrcProto.Type.Kind.INT);
+        break;
+      case LONG:
+        type.setKind(OrcProto.Type.Kind.LONG);
+        break;
+      case FLOAT:
+        type.setKind(OrcProto.Type.Kind.FLOAT);
+        break;
+      case DOUBLE:
+        type.setKind(OrcProto.Type.Kind.DOUBLE);
+        break;
+      case STRING:
+        type.setKind(OrcProto.Type.Kind.STRING);
+        break;
+      case CHAR:
+        type.setKind(OrcProto.Type.Kind.CHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case VARCHAR:
+        type.setKind(OrcProto.Type.Kind.VARCHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case BINARY:
+        type.setKind(OrcProto.Type.Kind.BINARY);
+        break;
+      case TIMESTAMP:
+        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+        break;
+      case DATE:
+        type.setKind(OrcProto.Type.Kind.DATE);
+        break;
+      case DECIMAL:
+        type.setKind(OrcProto.Type.Kind.DECIMAL);
+        type.setPrecision(schema.getPrecision());
+        type.setScale(schema.getScale());
         break;
       case LIST:
         type.setKind(OrcProto.Type.Kind.LIST);
-        type.addSubtypes(treeWriter.childrenWriters[0].id);
+        type.addSubtypes(children.get(0).getId());
         break;
       case MAP:
         type.setKind(OrcProto.Type.Kind.MAP);
-        type.addSubtypes(treeWriter.childrenWriters[0].id);
-        type.addSubtypes(treeWriter.childrenWriters[1].id);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
         break;
       case STRUCT:
         type.setKind(OrcProto.Type.Kind.STRUCT);
-        for(TreeWriter child: treeWriter.childrenWriters) {
-          type.addSubtypes(child.id);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
         }
-        for(StructField field: ((StructTreeWriter) treeWriter).fields) {
-          type.addFieldNames(field.getFieldName());
+        for(String field: schema.getFieldNames()) {
+          type.addFieldNames(field);
         }
         break;
       case UNION:
         type.setKind(OrcProto.Type.Kind.UNION);
-        for(TreeWriter child: treeWriter.childrenWriters) {
-          type.addSubtypes(child.id);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
         }
         break;
       default:
         throw new IllegalArgumentException("Unknown category: " +
-          treeWriter.inspector.getCategory());
+            schema.getCategory());
     }
     builder.addTypes(type);
-    for(TreeWriter child: treeWriter.childrenWriters) {
-      writeTypes(builder, child);
+    if (children != null) {
+      for(TypeDescription child: children) {
+        writeTypes(builder, child);
+      }
     }
   }
 
@@ -1855,9 +1798,9 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
           StreamName name = pair.getKey();
           long streamSize = pair.getValue().getOutputSize();
           builder.addStreams(OrcProto.Stream.newBuilder()
-                             .setColumn(name.getColumn())
-                             .setKind(name.getKind())
-                             .setLength(streamSize));
+              .setColumn(name.getColumn())
+              .setKind(name.getKind())
+              .setLength(streamSize));
           if (StreamName.Area.INDEX == name.getArea()) {
             indexSize += streamSize;
           } else {
@@ -1882,8 +1825,8 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
         // and user specified padding tolerance. Since stripe size can overflow
         // the default stripe size we should apply this correction to avoid
         // writing portion of last stripe to next hdfs block.
-        float correction = overflow > 0 ? (float) overflow
-            / (float) adjustedStripeSize : 0.0f;
+        double correction = overflow > 0 ? (double) overflow
+            / (double) adjustedStripeSize : 0.0;
 
         // correction should not be greater than user specified padding
         // tolerance
@@ -1941,75 +1884,60 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
   }
 
   private long computeRawDataSize() {
-    long result = 0;
-    for (TreeWriter child : treeWriter.getChildrenWriters()) {
-      result += getRawDataSizeFromInspectors(child, child.inspector);
-    }
-    return result;
+    return getRawDataSize(treeWriter, schema);
   }
 
-  private long getRawDataSizeFromInspectors(TreeWriter child, ObjectInspector 
oi) {
+  private long getRawDataSize(TreeWriter child,
+                              TypeDescription schema) {
     long total = 0;
-    switch (oi.getCategory()) {
-    case PRIMITIVE:
-      total += getRawDataSizeFromPrimitives(child, oi);
-      break;
-    case LIST:
-    case MAP:
-    case UNION:
-    case STRUCT:
-      for (TreeWriter tw : child.childrenWriters) {
-        total += getRawDataSizeFromInspectors(tw, tw.inspector);
+    long numVals = child.fileStatistics.getNumberOfValues();
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case FLOAT:
+        return numVals * JavaDataModel.get().primitive1();
+      case LONG:
+      case DOUBLE:
+        return numVals * JavaDataModel.get().primitive2();
+      case STRING:
+      case VARCHAR:
+      case CHAR:
+        // ORC strings are converted to java Strings. so use JavaDataModel to
+        // compute the overall size of strings
+        StringColumnStatistics scs = (StringColumnStatistics) 
child.fileStatistics;
+        numVals = numVals == 0 ? 1 : numVals;
+        int avgStringLen = (int) (scs.getSum() / numVals);
+        return numVals * 
JavaDataModel.get().lengthForStringOfLength(avgStringLen);
+      case DECIMAL:
+        return numVals * JavaDataModel.get().lengthOfDecimal();
+      case DATE:
+        return numVals * JavaDataModel.get().lengthOfDate();
+      case BINARY:
+        // get total length of binary blob
+        BinaryColumnStatistics bcs = (BinaryColumnStatistics) 
child.fileStatistics;
+        return bcs.getSum();
+      case TIMESTAMP:
+        return numVals * JavaDataModel.get().lengthOfTimestamp();
+      case LIST:
+      case MAP:
+      case UNION:
+      case STRUCT: {
+        TreeWriter[] childWriters = child.getChildrenWriters();
+        List<TypeDescription> childTypes = schema.getChildren();
+        for (int i=0; i < childWriters.length; ++i) {
+          total += getRawDataSize(childWriters[i], childTypes.get(i));
+        }
+        break;
       }
-      break;
-    default:
-      LOG.debug("Unknown object inspector category.");
-      break;
+      default:
+        LOG.debug("Unknown object inspector category.");
+        break;
     }
     return total;
   }
 
-  private long getRawDataSizeFromPrimitives(TreeWriter child, ObjectInspector 
oi) {
-    long result = 0;
-    long numVals = child.fileStatistics.getNumberOfValues();
-    switch (((PrimitiveObjectInspector) oi).getPrimitiveCategory()) {
-    case BOOLEAN:
-    case BYTE:
-    case SHORT:
-    case INT:
-    case FLOAT:
-      return numVals * JavaDataModel.get().primitive1();
-    case LONG:
-    case DOUBLE:
-      return numVals * JavaDataModel.get().primitive2();
-    case STRING:
-    case VARCHAR:
-    case CHAR:
-      // ORC strings are converted to java Strings. so use JavaDataModel to
-      // compute the overall size of strings
-      child = (StringTreeWriter) child;
-      StringColumnStatistics scs = (StringColumnStatistics) 
child.fileStatistics;
-      numVals = numVals == 0 ? 1 : numVals;
-      int avgStringLen = (int) (scs.getSum() / numVals);
-      return numVals * 
JavaDataModel.get().lengthForStringOfLength(avgStringLen);
-    case DECIMAL:
-      return numVals * JavaDataModel.get().lengthOfDecimal();
-    case DATE:
-      return numVals * JavaDataModel.get().lengthOfDate();
-    case BINARY:
-      // get total length of binary blob
-      BinaryColumnStatistics bcs = (BinaryColumnStatistics) 
child.fileStatistics;
-      return bcs.getSum();
-    case TIMESTAMP:
-      return numVals * JavaDataModel.get().lengthOfTimestamp();
-    default:
-      LOG.debug("Unknown primitive category.");
-      break;
-    }
-
-    return result;
-  }
-
   private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
     switch (kind) {
       case NONE: return OrcProto.CompressionKind.NONE;
@@ -2029,7 +1957,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     }
   }
 
-  private int writeMetadata(long bodyLength) throws IOException {
+  private int writeMetadata() throws IOException {
     getStream();
     OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder();
     for(OrcProto.StripeStatistics.Builder ssb : 
treeWriter.stripeStatsBuilders) {
@@ -2054,7 +1982,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     // populate raw data size
     rawDataSize = computeRawDataSize();
     // serialize the types
-    writeTypes(builder, treeWriter);
+    writeTypes(builder, schema);
     // add the stripe information
     for(OrcProto.StripeInformation stripe: stripes) {
       builder.addStripes(stripe);
@@ -2064,7 +1992,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     // add all of the user metadata
     for(Map.Entry<String, ByteString> entry: userMetadata.entrySet()) {
       builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
-        .setName(entry.getKey()).setValue(entry.getValue()));
+          .setName(entry.getKey()).setValue(entry.getValue()));
     }
     long startPosn = rawWriter.getPos();
     OrcProto.Footer footer = builder.build();
@@ -2076,14 +2004,14 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
 
   private int writePostScript(int footerLength, int metadataLength) throws 
IOException {
     OrcProto.PostScript.Builder builder =
-      OrcProto.PostScript.newBuilder()
-        .setCompression(writeCompressionKind(compress))
-        .setFooterLength(footerLength)
-        .setMetadataLength(metadataLength)
-        .setMagic(OrcFile.MAGIC)
-        .addVersion(version.getMajor())
-        .addVersion(version.getMinor())
-        .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId());
+        OrcProto.PostScript.newBuilder()
+            .setCompression(writeCompressionKind(compress))
+            .setFooterLength(footerLength)
+            .setMetadataLength(metadataLength)
+            .setMagic(OrcFile.MAGIC)
+            .addVersion(version.getMajor())
+            .addVersion(version.getMinor())
+            .setWriterVersion(OrcFile.CURRENT_WRITER.getId());
     if (compress != CompressionKind.NONE) {
       builder.setCompressionBlockSize(bufferSize);
     }
@@ -2122,7 +2050,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
         createRowIndexEntry();
       }
     }
-    memoryManager.addedRow();
+    memoryManager.addedRow(1);
   }
 
   @Override
@@ -2134,7 +2062,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     memoryManager.removeWriter(path);
     // actually close the file
     flushStripe();
-    int metadataLength = writeMetadata(rawWriter.getPos());
+    int metadataLength = writeMetadata();
     int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
     rawWriter.writeByte(writePostScript(footerLength, metadataLength));
     rawWriter.close();
@@ -2167,19 +2095,19 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
       if (callback != null) {
         callback.preFooterWrite(callbackContext);
       }
-      int metaLength = writeMetadata(rawWriter.getPos());
+      int metaLength = writeMetadata();
       int footLength = writeFooter(rawWriter.getPos() - metaLength);
       rawWriter.writeByte(writePostScript(footLength, metaLength));
       stripesAtLastFlush = stripes.size();
-      ShimLoader.getHadoopShims().hflush(rawWriter);
+      rawWriter.hflush();
     }
     return rawWriter.getPos();
   }
 
   @Override
   public void appendStripe(byte[] stripe, int offset, int length,
-      StripeInformation stripeInfo,
-      OrcProto.StripeStatistics stripeStatistics) throws IOException {
+                           StripeInformation stripeInfo,
+                           OrcProto.StripeStatistics stripeStatistics) throws 
IOException {
     checkArgument(stripe != null, "Stripe must not be null");
     checkArgument(length <= stripe.length,
         "Specified length must not be greater specified array length");
@@ -2189,12 +2117,11 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
 
     getStream();
     long start = rawWriter.getPos();
-    long stripeLen = length;
     long availBlockSpace = blockSize - (start % blockSize);
 
     // see if stripe can fit in the current hdfs block, else pad the remaining
     // space in the block
-    if (stripeLen < blockSize && stripeLen > availBlockSpace &&
+    if (length < blockSize && length > availBlockSpace &&
         addBlockPadding) {
       byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
       LOG.info(String.format("Padding ORC by %d bytes while merging..",
@@ -2247,7 +2174,7 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
   }
 
   private void getAllColumnTreeWritersImpl(TreeWriter tw,
-      List<TreeWriter> result) {
+                                           List<TreeWriter> result) {
     result.add(tw);
     for (TreeWriter child : tw.childrenWriters) {
       getAllColumnTreeWritersImpl(child, result);
@@ -2255,9 +2182,9 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
   }
 
   @Override
-  public void appendUserMetadata(List<UserMetadataItem> userMetadata) {
+  public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) 
{
     if (userMetadata != null) {
-      for (UserMetadataItem item : userMetadata) {
+      for (OrcProto.UserMetadataItem item : userMetadata) {
         this.userMetadata.put(item.getName(), item.getValue());
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZeroCopyAdapter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZeroCopyAdapter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZeroCopyAdapter.java
new file mode 100644
index 0000000..2886fe7
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZeroCopyAdapter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.ReadOption;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+public class ZeroCopyAdapter {
+  private final FSDataInputStream in;
+  private final ByteBufferPoolAdapter pool;
+  private final static EnumSet<ReadOption> CHECK_SUM = EnumSet
+      .noneOf(ReadOption.class);
+  private final static EnumSet<ReadOption> NO_CHECK_SUM = EnumSet
+      .of(ReadOption.SKIP_CHECKSUMS);
+
+  public ZeroCopyAdapter(FSDataInputStream in, ByteBufferAllocatorPool 
poolshim) {
+    this.in = in;
+    if (poolshim != null) {
+      pool = new ByteBufferPoolAdapter(poolshim);
+    } else {
+      pool = null;
+    }
+  }
+
+  public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums)
+      throws IOException {
+    EnumSet<ReadOption> options = NO_CHECK_SUM;
+    if (verifyChecksums) {
+      options = CHECK_SUM;
+    }
+    return this.in.read(this.pool, maxLength, options);
+  }
+
+  public final void releaseBuffer(ByteBuffer buffer) {
+    this.in.releaseBuffer(buffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java
deleted file mode 100644
index d0a8fa7..0000000
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
-import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
-import org.apache.hadoop.hive.shims.ShimLoader;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-import java.util.zip.DataFormatException;
-import java.util.zip.Deflater;
-import java.util.zip.Inflater;
-
-class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
-
-  private Boolean direct = null;
-
-  private final int level;
-  private final int strategy;
-
-  public ZlibCodec() {
-    level = Deflater.DEFAULT_COMPRESSION;
-    strategy = Deflater.DEFAULT_STRATEGY;
-  }
-
-  private ZlibCodec(int level, int strategy) {
-    this.level = level;
-    this.strategy = strategy;
-  }
-
-  @Override
-  public boolean compress(ByteBuffer in, ByteBuffer out,
-                          ByteBuffer overflow) throws IOException {
-    Deflater deflater = new Deflater(level, true);
-    deflater.setStrategy(strategy);
-    int length = in.remaining();
-    deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
-    deflater.finish();
-    int outSize = 0;
-    int offset = out.arrayOffset() + out.position();
-    while (!deflater.finished() && (length > outSize)) {
-      int size = deflater.deflate(out.array(), offset, out.remaining());
-      out.position(size + out.position());
-      outSize += size;
-      offset += size;
-      // if we run out of space in the out buffer, use the overflow
-      if (out.remaining() == 0) {
-        if (overflow == null) {
-          deflater.end();
-          return false;
-        }
-        out = overflow;
-        offset = out.arrayOffset() + out.position();
-      }
-    }
-    deflater.end();
-    return length > outSize;
-  }
-
-  @Override
-  public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
-
-    if(in.isDirect() && out.isDirect()) {
-      directDecompress(in, out);
-      return;
-    }
-
-    Inflater inflater = new Inflater(true);
-    inflater.setInput(in.array(), in.arrayOffset() + in.position(),
-                      in.remaining());
-    while (!(inflater.finished() || inflater.needsDictionary() ||
-             inflater.needsInput())) {
-      try {
-        int count = inflater.inflate(out.array(),
-                                     out.arrayOffset() + out.position(),
-                                     out.remaining());
-        out.position(count + out.position());
-      } catch (DataFormatException dfe) {
-        throw new IOException("Bad compression data", dfe);
-      }
-    }
-    out.flip();
-    inflater.end();
-    in.position(in.limit());
-  }
-
-  @Override
-  public boolean isAvailable() {
-    if (direct == null) {
-      // see nowrap option in new Inflater(boolean) which disables zlib headers
-      try {
-        if (ShimLoader.getHadoopShims().getDirectDecompressor(
-            DirectCompressionType.ZLIB_NOHEADER) != null) {
-          direct = Boolean.valueOf(true);
-        } else {
-          direct = Boolean.valueOf(false);
-        }
-      } catch (UnsatisfiedLinkError ule) {
-        direct = Boolean.valueOf(false);
-      }
-    }
-    return direct.booleanValue();
-  }
-
-  @Override
-  public void directDecompress(ByteBuffer in, ByteBuffer out)
-      throws IOException {
-    DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims()
-        .getDirectDecompressor(DirectCompressionType.ZLIB_NOHEADER);
-    decompressShim.decompress(in, out);
-    out.flip(); // flip for read
-  }
-
-  @Override
-  public CompressionCodec modify(@Nullable EnumSet<Modifier> modifiers) {
-
-    if (modifiers == null) {
-      return this;
-    }
-
-    int l = this.level;
-    int s = this.strategy;
-
-    for (Modifier m : modifiers) {
-      switch (m) {
-      case BINARY:
-        /* filtered == less LZ77, more huffman */
-        s = Deflater.FILTERED;
-        break;
-      case TEXT:
-        s = Deflater.DEFAULT_STRATEGY;
-        break;
-      case FASTEST:
-        // deflate_fast looking for 8 byte patterns
-        l = Deflater.BEST_SPEED;
-        break;
-      case FAST:
-        // deflate_fast looking for 16 byte patterns
-        l = Deflater.BEST_SPEED + 1;
-        break;
-      case DEFAULT:
-        // deflate_slow looking for 128 byte patterns
-        l = Deflater.DEFAULT_COMPRESSION;
-        break;
-      default:
-        break;
-      }
-    }
-    return new ZlibCodec(l, s);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/proto/orc_proto.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/proto/orc_proto.proto 
b/tajo-storage/tajo-storage-hdfs/src/main/proto/orc_proto.proto
deleted file mode 100644
index c80cf6c..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/proto/orc_proto.proto
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-message IntegerStatistics  {
-  optional sint64 minimum = 1;
-  optional sint64 maximum = 2;
-  optional sint64 sum = 3;
-}
-
-message DoubleStatistics {
-  optional double minimum = 1;
-  optional double maximum = 2;
-  optional double sum = 3;
-}
-
-message StringStatistics {
-  optional string minimum = 1;
-  optional string maximum = 2;
-  // sum will store the total length of all strings in a stripe
-  optional sint64 sum = 3;
-}
-
-message BucketStatistics {
-  repeated uint64 count = 1 [packed=true];
-}
-
-message DecimalStatistics {
-  optional string minimum = 1;
-  optional string maximum = 2;
-  optional string sum = 3;
-}
-
-message DateStatistics {
-  // min,max values saved as days since epoch
-  optional sint32 minimum = 1;
-  optional sint32 maximum = 2;
-}
-
-message TimestampStatistics {
-  // min,max values saved as milliseconds since epoch
-  optional sint64 minimum = 1;
-  optional sint64 maximum = 2;
-}
-
-message BinaryStatistics {
-  // sum will store the total binary blob length in a stripe
-  optional sint64 sum = 1;
-}
-
-message ColumnStatistics {
-  optional uint64 numberOfValues = 1;
-  optional IntegerStatistics intStatistics = 2;
-  optional DoubleStatistics doubleStatistics = 3;
-  optional StringStatistics stringStatistics = 4;
-  optional BucketStatistics bucketStatistics = 5;
-  optional DecimalStatistics decimalStatistics = 6;
-  optional DateStatistics dateStatistics = 7;
-  optional BinaryStatistics binaryStatistics = 8;
-  optional TimestampStatistics timestampStatistics = 9;
-  optional bool hasNull = 10;
-}
-
-message RowIndexEntry {
-  repeated uint64 positions = 1 [packed=true];
-  optional ColumnStatistics statistics = 2;
-}
-
-message RowIndex {
-  repeated RowIndexEntry entry = 1;
-}
-
-message BloomFilter {
-  optional uint32 numHashFunctions = 1;
-  repeated fixed64 bitset = 2;
-}
-
-message BloomFilterIndex {
-  repeated BloomFilter bloomFilter = 1;
-}
-
-message Stream {
-  // if you add new index stream kinds, you need to make sure to update
-  // StreamName to ensure it is added to the stripe in the right area
-  enum Kind {
-    PRESENT = 0;
-    DATA = 1;
-    LENGTH = 2;
-    DICTIONARY_DATA = 3;
-    DICTIONARY_COUNT = 4;
-    SECONDARY = 5;
-    ROW_INDEX = 6;
-    BLOOM_FILTER = 7;
-  }
-  optional Kind kind = 1;
-  optional uint32 column = 2;
-  optional uint64 length = 3;
-}
-
-message ColumnEncoding {
-  enum Kind {
-    DIRECT = 0;
-    DICTIONARY = 1;
-    DIRECT_V2 = 2;
-    DICTIONARY_V2 = 3;
-  }
-  optional Kind kind = 1;
-  optional uint32 dictionarySize = 2;
-}
-
-message StripeFooter {
-  repeated Stream streams = 1;
-  repeated ColumnEncoding columns = 2;
-  optional string writerTimezone = 3;
-}
-
-message Type {
-  enum Kind {
-    BOOLEAN = 0;
-    BYTE = 1;
-    SHORT = 2;
-    INT = 3;
-    LONG = 4;
-    FLOAT = 5;
-    DOUBLE = 6;
-    STRING = 7;
-    BINARY = 8;
-    TIMESTAMP = 9;
-    LIST = 10;
-    MAP = 11;
-    STRUCT = 12;
-    UNION = 13;
-    DECIMAL = 14;
-    DATE = 15;
-    VARCHAR = 16;
-    CHAR = 17;
-  }
-  optional Kind kind = 1;
-  repeated uint32 subtypes = 2 [packed=true];
-  repeated string fieldNames = 3;
-  optional uint32 maximumLength = 4;
-  optional uint32 precision = 5;
-  optional uint32 scale = 6;
-}
-
-message StripeInformation {
-  optional uint64 offset = 1;
-  optional uint64 indexLength = 2;
-  optional uint64 dataLength = 3;
-  optional uint64 footerLength = 4;
-  optional uint64 numberOfRows = 5;
-}
-
-message UserMetadataItem {
-  optional string name = 1;
-  optional bytes value = 2;
-}
-
-message StripeStatistics {
-  repeated ColumnStatistics colStats = 1;
-}
-
-message Metadata {
-  repeated StripeStatistics stripeStats = 1;
-}
-
-message Footer {
-  optional uint64 headerLength = 1;
-  optional uint64 contentLength = 2;
-  repeated StripeInformation stripes = 3;
-  repeated Type types = 4;
-  repeated UserMetadataItem metadata = 5;
-  optional uint64 numberOfRows = 6;
-  repeated ColumnStatistics statistics = 7;
-  optional uint32 rowIndexStride = 8;
-}
-
-enum CompressionKind {
-  NONE = 0;
-  ZLIB = 1;
-  SNAPPY = 2;
-  LZO = 3;
-}
-
-// Serialized length must be less that 255 bytes
-message PostScript {
-  optional uint64 footerLength = 1;
-  optional CompressionKind compression = 2;
-  optional uint64 compressionBlockSize = 3;
-  // the version of the file format
-  //   [0, 11] = Hive 0.11
-  //   [0, 12] = Hive 0.12
-  repeated uint32 version = 4 [packed = true];
-  optional uint64 metadataLength = 5;
-  // Version of the writer:
-  //   0 (or missing) = original
-  //   1 = HIVE-8732 fixed
-  optional uint32 writerVersion = 6;
-  // Leave this last in the record
-  optional string magic = 8000;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 96e58eb..1b39070 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.orc.OrcConf;
 import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
@@ -61,6 +62,7 @@ public class TestCompressionStorages {
   public TestCompressionStorages(String type) throws IOException {
     this.dataFormat = type;
     conf = new TajoConf();
+    conf.setBoolean("hive.exec.orc.zerocopy", true);
 
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
@@ -71,7 +73,8 @@ public class TestCompressionStorages {
     return Arrays.asList(new Object[][]{
         {BuiltinStorages.TEXT},
         {BuiltinStorages.RCFILE},
-        {BuiltinStorages.SEQUENCE_FILE}
+        {BuiltinStorages.SEQUENCE_FILE},
+        {BuiltinStorages.ORC}
     });
   }
 
@@ -120,6 +123,14 @@ public class TestCompressionStorages {
     meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
     meta.putOption("sequencefile.serde", 
TextSerializerDeserializer.class.getName());
 
+    if (codec.equals(SnappyCodec.class)) {
+      meta.putOption(OrcConf.COMPRESS.getAttribute(), "SNAPPY");
+    } else if (codec.equals(Lz4Codec.class)) {
+      meta.putOption(OrcConf.COMPRESS.getAttribute(), "ZLIB");
+    } else {
+      meta.putOption(OrcConf.COMPRESS.getAttribute(), "NONE");
+    }
+
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
     Appender appender = ((FileTablespace) 
TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index b37270b..b4a373f 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -167,6 +167,21 @@ public class TestStorages {
    fs.delete(testDir, true);
   }
 
+  private boolean protoTypeSupport() {
+    return internalType;
+  }
+
+  private boolean timeTypeSupport() {
+    return internalType
+        || dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT);
+  }
+
+  private boolean dateTypeSupport() {
+    return internalType
+        || dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT)
+        || dataFormat.equalsIgnoreCase(BuiltinStorages.ORC);
+  }
+
   @Test
   public void testSplitable() throws IOException {
     if (splitable) {
@@ -385,8 +400,6 @@ public class TestStorages {
 
   @Test
   public void testVariousTypes() throws IOException {
-    boolean handleProtobuf = 
!dataFormat.equalsIgnoreCase(BuiltinStorages.JSON);
-
     Schema schema = new Schema();
     schema.addColumn("col1", Type.BOOLEAN);
     schema.addColumn("col2", Type.CHAR, 7);
@@ -398,7 +411,7 @@ public class TestStorages {
     schema.addColumn("col8", Type.TEXT);
     schema.addColumn("col9", Type.BLOB);
     schema.addColumn("col10", Type.INET4);
-    if (handleProtobuf) {
+    if (protoTypeSupport()) {
       schema.addColumn("col11", CatalogUtil.newDataType(Type.PROTOBUF, 
TajoIdProtos.QueryIdProto.class.getName()));
     }
 
@@ -418,7 +431,7 @@ public class TestStorages {
     QueryId queryid = new QueryId("12345", 5);
     ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
 
-    VTuple tuple = new VTuple(10 + (handleProtobuf ? 1 : 0));
+    VTuple tuple = new VTuple(10 + (protoTypeSupport() ? 1 : 0));
     tuple.put(new Datum[] {
         DatumFactory.createBool(true),
         DatumFactory.createChar("hyunsik"),
@@ -432,7 +445,7 @@ public class TestStorages {
         DatumFactory.createInet4("192.168.0.1"),
     });
 
-    if (handleProtobuf) {
+    if (protoTypeSupport()) {
       tuple.put(10, factory.createDatum(queryid.getProto()));
     }
 
@@ -456,8 +469,6 @@ public class TestStorages {
 
   @Test
   public void testNullHandlingTypes() throws IOException {
-    boolean handleProtobuf = 
!dataFormat.equalsIgnoreCase(BuiltinStorages.JSON);
-
     Schema schema = new Schema();
     schema.addColumn("col1", Type.BOOLEAN);
     schema.addColumn("col2", Type.CHAR, 7);
@@ -470,7 +481,7 @@ public class TestStorages {
     schema.addColumn("col9", Type.BLOB);
     schema.addColumn("col10", Type.INET4);
 
-    if (handleProtobuf) {
+    if (protoTypeSupport()) {
       schema.addColumn("col11", CatalogUtil.newDataType(Type.PROTOBUF, 
TajoIdProtos.QueryIdProto.class.getName()));
     }
 
@@ -492,7 +503,7 @@ public class TestStorages {
 
     QueryId queryid = new QueryId("12345", 5);
     ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-    int columnNum = 10 + (handleProtobuf ? 1 : 0);
+    int columnNum = 10 + (protoTypeSupport() ? 1 : 0);
     VTuple seedTuple = new VTuple(columnNum);
     seedTuple.put(new Datum[]{
         DatumFactory.createBool(true),                // 0
@@ -507,7 +518,7 @@ public class TestStorages {
         DatumFactory.createInet4("192.168.0.1")       // 10
     });
 
-    if (handleProtobuf) {
+    if (protoTypeSupport()) {
       seedTuple.put(10, factory.createDatum(queryid.getProto()));       // 11
     }
 
@@ -553,8 +564,6 @@ public class TestStorages {
   public void testNullHandlingTypesWithProjection() throws IOException {
     if (internalType) return;
 
-    boolean handleProtobuf = 
!dataFormat.equalsIgnoreCase(BuiltinStorages.JSON);
-
     Schema schema = new Schema();
     schema.addColumn("col1", Type.BOOLEAN);
     schema.addColumn("col2", Type.CHAR, 7);
@@ -567,7 +576,7 @@ public class TestStorages {
     schema.addColumn("col9", Type.BLOB);
     schema.addColumn("col10", Type.INET4);
 
-    if (handleProtobuf) {
+    if (protoTypeSupport()) {
       schema.addColumn("col11", CatalogUtil.newDataType(Type.PROTOBUF, 
TajoIdProtos.QueryIdProto.class.getName()));
     }
 
@@ -589,7 +598,7 @@ public class TestStorages {
 
     QueryId queryid = new QueryId("12345", 5);
     ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-    int columnNum = 10 + (handleProtobuf ? 1 : 0);
+    int columnNum = 10 + (protoTypeSupport() ? 1 : 0);
     VTuple seedTuple = new VTuple(columnNum);
     seedTuple.put(new Datum[]{
         DatumFactory.createBool(true),                // 0
@@ -604,7 +613,7 @@ public class TestStorages {
         DatumFactory.createInet4("192.168.0.1")       // 10
     });
 
-    if (handleProtobuf) {
+    if (protoTypeSupport()) {
       seedTuple.put(10, factory.createDatum(queryid.getProto()));       // 11
     }
 
@@ -933,11 +942,17 @@ public class TestStorages {
 
   @Test
   public void testTime() throws IOException {
-    if (dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT) || internalType) {
+    if (dateTypeSupport() || timeTypeSupport()) {
+
+      int index = 2;
       Schema schema = new Schema();
-      schema.addColumn("col1", Type.DATE);
-      schema.addColumn("col2", Type.TIME);
-      schema.addColumn("col3", Type.TIMESTAMP);
+      schema.addColumn("col1", Type.TIMESTAMP);
+      if (dateTypeSupport()) {
+        schema.addColumn("col" + index++, Type.DATE);
+      }
+      if (timeTypeSupport()) {
+        schema.addColumn("col" + index++, Type.TIME);
+      }
 
       KeyValueSet options = new KeyValueSet();
       TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options);
@@ -947,11 +962,15 @@ public class TestStorages {
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.init();
 
-      VTuple tuple = new VTuple(new Datum[]{
-          DatumFactory.createDate("1980-04-01"),
-          DatumFactory.createTime("12:34:56"),
-          
DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 
1000))
-      });
+      VTuple tuple = new VTuple(index - 1);
+      index = 0;
+      tuple.put(index++, 
DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 
1000)));
+      if (dateTypeSupport()) {
+        tuple.put(index++, DatumFactory.createDate("1980-04-01"));
+      }
+      if (timeTypeSupport()) {
+        tuple.put(index, DatumFactory.createTime("12:34:56"));
+      }
       appender.addTuple(tuple);
       appender.flush();
       appender.close();
@@ -964,7 +983,7 @@ public class TestStorages {
       Tuple retrieved;
       while ((retrieved = scanner.next()) != null) {
         for (int i = 0; i < tuple.size(); i++) {
-          assertEquals(tuple.get(i), retrieved.asDatum(i));
+          assertEquals("failed at " + i + " th column", tuple.get(i), 
retrieved.asDatum(i));
         }
       }
       scanner.close();

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
index f71f052..f1d1368 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
@@ -12,8 +12,7 @@
     { "name": "col7", "type": "double" },
     { "name": "col8", "type": "string" },
     { "name": "col9", "type": "bytes" },
-    { "name": "col10", "type": "bytes" },
-    { "name": "col11", "type": "bytes" }
+    { "name": "col10", "type": "bytes" }
   ]
 }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index 6f7e53b..3283f9f 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -117,7 +117,7 @@
 
   <property>
     <name>tajo.storage.scanner-handler.orc.class</name>
-    <value>org.apache.tajo.storage.orc.ORCScanner</value>
+    <value>org.apache.tajo.storage.orc.OrcScanner</value>
   </property>
 
   <property>

Reply via email to