Repository: asterixdb
Updated Branches:
  refs/heads/master 74271c023 -> 5226ca874


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFile.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFile.java
 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFile.java
new file mode 100644
index 0000000..0c5613a
--- /dev/null
+++ 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFile.java
@@ -0,0 +1,2049 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.rmi.server.UID;
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.LazyDecompressionCallback;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * <code>RCFile</code>s, short of Record Columnar File, are flat files
+ * consisting of binary key/value pairs, which shares much similarity with
+ * <code>SequenceFile</code>.
+ *
+ * RCFile stores columns of a table in a record columnar way. It first
+ * partitions rows horizontally into row splits. and then it vertically
+ * partitions each row split in a columnar way. RCFile first stores the meta
+ * data of a row split, as the key part of a record, and all the data of a row
+ * split as the value part. When writing, RCFile.Writer first holds records'
+ * value bytes in memory, and determines a row split if the raw bytes size of
+ * buffered records overflow a given 
parameter<tt>Writer.columnsBufferSize</tt>,
+ * which can be set like: <code>conf.setInt(COLUMNS_BUFFER_SIZE_CONF_STR,
+          4 * 1024 * 1024)</code> .
+ * <p>
+ * <code>RCFile</code> provides {@link Writer}, {@link Reader} and classes for
+ * writing, reading respectively.
+ * </p>
+ *
+ * <p>
+ * RCFile stores columns of a table in a record columnar way. It first
+ * partitions rows horizontally into row splits. and then it vertically
+ * partitions each row split in a columnar way. RCFile first stores the meta
+ * data of a row split, as the key part of a record, and all the data of a row
+ * split as the value part.
+ * </p>
+ *
+ * <p>
+ * RCFile compresses values in a more fine-grained manner then record level
+ * compression. However, It currently does not support compress the key part
+ * yet. The actual compression algorithm used to compress key and/or values can
+ * be specified by using the appropriate {@link CompressionCodec}.
+ * </p>
+ *
+ * <p>
+ * The {@link Reader} is used to read and explain the bytes of RCFile.
+ * </p>
+ *
+ * <h4 id="Formats">RCFile Formats</h4>
+ *
+ *
+ * <h5 id="Header">RC Header</h5>
+ * <ul>
+ * <li>version - 3 bytes of magic header <b>RCF</b>, followed by 1 byte of
+ * actual version number (e.g. RCF1)</li>
+ * <li>compression - A boolean which specifies if compression is turned on for
+ * keys/values in this file.</li>
+ * <li>compression codec - <code>CompressionCodec</code> class which is used
+ * for compression of keys and/or values (if compression is enabled).</li>
+ * <li>metadata - {@link Metadata} for this file.</li>
+ * <li>sync - A sync marker to denote end of the header.</li>
+ * </ul>
+ *
+ * <h5>RCFile Format</h5>
+ * <ul>
+ * <li><a href="#Header">Header</a></li>
+ * <li>Record
+ * <li>Key part
+ * <ul>
+ * <li>Record length in bytes</li>
+ * <li>Key length in bytes</li>
+ * <li>Number_of_rows_in_this_record(vint)</li>
+ * <li>Column_1_ondisk_length(vint)</li>
+ * <li>Column_1_row_1_value_plain_length</li>
+ * <li>Column_1_row_2_value_plain_length</li>
+ * <li>...</li>
+ * <li>Column_2_ondisk_length(vint)</li>
+ * <li>Column_2_row_1_value_plain_length</li>
+ * <li>Column_2_row_2_value_plain_length</li>
+ * <li>...</li>
+ * </ul>
+ * </li>
+ * </li>
+ * <li>Value part
+ * <ul>
+ * <li>Compressed or plain data of [column_1_row_1_value,
+ * column_1_row_2_value,....]</li>
+ * <li>Compressed or plain data of [column_2_row_1_value,
+ * column_2_row_2_value,....]</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p>
+ * <pre>
+ * {@code
+ * The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
+ * with dashes:
+ *
+ * rcfile ::=
+ *   <file-header>
+ *   <rcfile-rowgroup>+
+ *
+ * file-header ::=
+ *   <file-version-header>
+ *   <file-key-class-name>              (only exists if version is seq6)
+ *   <file-value-class-name>            (only exists if version is seq6)
+ *   <file-is-compressed>
+ *   <file-is-block-compressed>         (only exists if version is seq6)
+ *   [<file-compression-codec-class>]
+ *   <file-header-metadata>
+ *   <file-sync-field>
+ *
+ * -- The normative RCFile implementation included with Hive is actually
+ * -- based on a modified version of Hadoop's SequenceFile code. Some
+ * -- things which should have been modified were not, including the code
+ * -- that writes out the file version header. Consequently, RCFile and
+ * -- SequenceFile originally shared the same version header.  A newer
+ * -- release has created a unique version string.
+ *
+ * file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
+ *                     |   Byte[4] {'R', 'C', 'F', 1}
+ *
+ * -- The name of the Java class responsible for reading the key buffer
+ * -- component of the rowgroup.
+ *
+ * file-key-class-name ::=
+ *   Text {"org.apache.asterix.hivecompat.io.RCFile$KeyBuffer"}
+ *
+ * -- The name of the Java class responsible for reading the value buffer
+ * -- component of the rowgroup.
+ *
+ * file-value-class-name ::=
+ *   Text {"org.apache.asterix.hivecompat.io.RCFile$ValueBuffer"}
+ *
+ * -- Boolean variable indicating whether or not the file uses compression
+ * -- for the key and column buffer sections.
+ *
+ * file-is-compressed ::= Byte[1]
+ *
+ * -- A boolean field indicating whether or not the file is block compressed.
+ * -- This field is *always* false. According to comments in the original
+ * -- RCFile implementation this field was retained for backwards
+ * -- compatability with the SequenceFile format.
+ *
+ * file-is-block-compressed ::= Byte[1] {false}
+ *
+ * -- The Java class name of the compression codec iff <file-is-compressed>
+ * -- is true. The named class must implement
+ * -- org.apache.hadoop.io.compress.CompressionCodec.
+ * -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
+ *
+ * file-compression-codec-class ::= Text
+ *
+ * -- A collection of key-value pairs defining metadata values for the
+ * -- file. The Map is serialized using standard JDK serialization, i.e.
+ * -- an Int corresponding to the number of key-value pairs, followed by
+ * -- Text key and value pairs. The following metadata properties are
+ * -- mandatory for all RCFiles:
+ * --
+ * -- hive.io.rcfile.column.number: the number of columns in the RCFile
+ *
+ * file-header-metadata ::= Map<Text, Text>
+ *
+ * -- A 16 byte marker that is generated by the writer. This marker appears
+ * -- at regular intervals at the beginning of rowgroup-headers, and is
+ * -- intended to enable readers to skip over corrupted rowgroups.
+ *
+ * file-sync-hash ::= Byte[16]
+ *
+ * -- Each row group is split into three sections: a header, a set of
+ * -- key buffers, and a set of column buffers. The header section includes
+ * -- an optional sync hash, information about the size of the row group, and
+ * -- the total number of rows in the row group. Each key buffer
+ * -- consists of run-length encoding data which is used to decode
+ * -- the length and offsets of individual fields in the corresponding column
+ * -- buffer.
+ *
+ * rcfile-rowgroup ::=
+ *   <rowgroup-header>
+ *   <rowgroup-key-data>
+ *   <rowgroup-column-buffers>
+ *
+ * rowgroup-header ::=
+ *   [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
+ *   <rowgroup-record-length>
+ *   <rowgroup-key-length>
+ *   <rowgroup-compressed-key-length>
+ *
+ * -- rowgroup-key-data is compressed if the column data is compressed.
+ * rowgroup-key-data ::=
+ *   <rowgroup-num-rows>
+ *   <rowgroup-key-buffers>
+ *
+ * -- An integer (always -1) signaling the beginning of a sync-hash
+ * -- field.
+ *
+ * rowgroup-sync-marker ::= Int
+ *
+ * -- A 16 byte sync field. This must match the <file-sync-hash> value read
+ * -- in the file header.
+ *
+ * rowgroup-sync-hash ::= Byte[16]
+ *
+ * -- The record-length is the sum of the number of bytes used to store
+ * -- the key and column parts, i.e. it is the total length of the current
+ * -- rowgroup.
+ *
+ * rowgroup-record-length ::= Int
+ *
+ * -- Total length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-key-length ::= Int
+ *
+ * -- Total compressed length in bytes of the rowgroup's key sections.
+ *
+ * rowgroup-compressed-key-length ::= Int
+ *
+ * -- Number of rows in the current rowgroup.
+ *
+ * rowgroup-num-rows ::= VInt
+ *
+ * -- One or more column key buffers corresponding to each column
+ * -- in the RCFile.
+ *
+ * rowgroup-key-buffers ::= <rowgroup-key-buffer>+
+ *
+ * -- Data in each column buffer is stored using a run-length
+ * -- encoding scheme that is intended to reduce the cost of
+ * -- repeated column field values. This mechanism is described
+ * -- in more detail in the following entries.
+ *
+ * rowgroup-key-buffer ::=
+ *   <column-buffer-length>
+ *   <column-buffer-uncompressed-length>
+ *   <column-key-buffer-length>
+ *   <column-key-buffer>
+ *
+ * -- The serialized length on disk of the corresponding column buffer.
+ *
+ * column-buffer-length ::= VInt
+ *
+ * -- The uncompressed length of the corresponding column buffer. This
+ * -- is equivalent to column-buffer-length if the RCFile is not compressed.
+ *
+ * column-buffer-uncompressed-length ::= VInt
+ *
+ * -- The length in bytes of the current column key buffer
+ *
+ * column-key-buffer-length ::= VInt
+ *
+ * -- The column-key-buffer contains a sequence of serialized VInt values
+ * -- corresponding to the byte lengths of the serialized column fields
+ * -- in the corresponding rowgroup-column-buffer. For example, consider
+ * -- an integer column that contains the consecutive values 1, 2, 3, 44.
+ * -- The RCFile format stores these values as strings in the column buffer,
+ * -- e.g. "12344". The length of each column field is recorded in
+ * -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
+ * -- if the same length occurs repeatedly, then we replace repeated
+ * -- run lengths with the complement (i.e. negative) of the number of
+ * -- repetitions, so 1,1,1,2 becomes 1,~2,2.
+ *
+ * column-key-buffer ::= Byte[column-key-buffer-length]
+ *
+ * rowgroup-column-buffers ::= <rowgroup-value-buffer>+
+ *
+ * -- RCFile stores all column data as strings regardless of the
+ * -- underlying column type. The strings are neither length-prefixed or
+ * -- null-terminated, and decoding them into individual fields requires
+ * -- the use of the run-length information contained in the corresponding
+ * -- column-key-buffer.
+ *
+ * rowgroup-column-buffer ::= Byte[column-buffer-length]
+ *
+ * Byte ::= An eight-bit byte
+ *
+ * VInt ::= Variable length integer. The high-order bit of each byte
+ * indicates whether more bytes remain to be read. The low-order seven
+ * bits are appended as increasingly more significant bits in the
+ * resulting integer value.
+ *
+ * Int ::= A four-byte integer in big-endian format.
+ *
+ * Text ::= VInt, Chars (Length prefixed UTF-8 characters)
+ * }
+ * </pre>
+ * </p>
+ */
+public class RCFile {
+
+  private static final Log LOG = LogFactory.getLog(RCFile.class);
+
+  public static final String RECORD_INTERVAL_CONF_STR = 
"hive.io.rcfile.record.interval";
+
+  public static final String COLUMN_NUMBER_METADATA_STR = 
"hive.io.rcfile.column.number";
+
+  public static final String COLUMN_NUMBER_CONF_STR = 
"hive.io.rcfile.column.number.conf";
+
+  public static final String TOLERATE_CORRUPTIONS_CONF_STR =
+    "hive.io.rcfile.tolerate.corruptions";
+
+  // HACK: We actually need BlockMissingException, but that is not available
+  // in all hadoop versions.
+  public static final String BLOCK_MISSING_MESSAGE =
+    "Could not obtain block";
+
+  // All of the versions should be place in this list.
+  private static final int ORIGINAL_VERSION = 0;  // version with SEQ
+  private static final int NEW_MAGIC_VERSION = 1; // version with RCF
+
+  private static final int CURRENT_VERSION = NEW_MAGIC_VERSION;
+
+  // The first version of RCFile used the sequence file header.
+  private static final byte[] ORIGINAL_MAGIC = new byte[] {
+      (byte) 'S', (byte) 'E', (byte) 'Q'};
+  // the version that was included with the original magic, which is mapped
+  // into ORIGINAL_VERSION
+  private static final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
+
+  private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[] {
+    (byte) 'S', (byte) 'E', (byte) 'Q', ORIGINAL_MAGIC_VERSION_WITH_METADATA
+  };
+
+  // The 'magic' bytes at the beginning of the RCFile
+  private static final byte[] MAGIC = new byte[] {
+    (byte) 'R', (byte) 'C', (byte) 'F'};
+
+  private static final int SYNC_ESCAPE = -1; // "length" of sync entries
+  private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
+  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
+
+  /** The number of bytes between sync points. */
+  public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
+
+  /**
+   * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
+   * below:
+   *
+   * <ul>
+   * <li>record length in bytes,it is the sum of bytes used to store the key
+   * part and the value part.</li>
+   * <li>Key length in bytes, it is how many bytes used by the key part.</li>
+   * <li>number_of_rows_in_this_record(vint),</li>
+   * <li>column_1_ondisk_length(vint),</li>
+   * <li>column_1_row_1_value_plain_length,</li>
+   * <li>column_1_row_2_value_plain_length,</li>
+   * <li>....</li>
+   * <li>column_2_ondisk_length(vint),</li>
+   * <li>column_2_row_1_value_plain_length,</li>
+   * <li>column_2_row_2_value_plain_length,</li>
+   * <li>.... .</li>
+   * <li>{the end of the key part}</li>
+   * </ul>
+   */
+  public static class KeyBuffer implements WritableComparable {
+    // each column's length in the value
+    private int[] eachColumnValueLen = null;
+    private int[] eachColumnUncompressedValueLen = null;
+    // stores each cell's length of a column in one DataOutputBuffer element
+    private NonSyncDataOutputBuffer[] allCellValLenBuffer = null;
+    // how many rows in this split
+    private int numberRows = 0;
+    // how many columns
+    private int columnNumber = 0;
+
+    // return the number of columns recorded in this file's header
+    public int getColumnNumber() {
+      return columnNumber;
+    }
+
+    @SuppressWarnings("unused")
+    @Deprecated
+    public KeyBuffer(){
+    }
+
+    KeyBuffer(int columnNum) {
+      columnNumber = columnNum;
+      eachColumnValueLen = new int[columnNumber];
+      eachColumnUncompressedValueLen = new int[columnNumber];
+      allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
+    }
+
+    @SuppressWarnings("unused")
+    @Deprecated
+    KeyBuffer(int numberRows, int columnNum) {
+      this(columnNum);
+      this.numberRows = numberRows;
+    }
+
+    public void nullColumn(int columnIndex) {
+      eachColumnValueLen[columnIndex] = 0;
+      eachColumnUncompressedValueLen[columnIndex] = 0;
+      allCellValLenBuffer[columnIndex] = new NonSyncDataOutputBuffer();
+    }
+
+    /**
+     * add in a new column's meta data.
+     *
+     * @param columnValueLen
+     *          this total bytes number of this column's values in this split
+     * @param colValLenBuffer
+     *          each cell's length of this column's in this split
+     */
+    void setColumnLenInfo(int columnValueLen,
+        NonSyncDataOutputBuffer colValLenBuffer,
+        int columnUncompressedValueLen, int columnIndex) {
+      eachColumnValueLen[columnIndex] = columnValueLen;
+      eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
+      allCellValLenBuffer[columnIndex] = colValLenBuffer;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      eachColumnValueLen = new int[columnNumber];
+      eachColumnUncompressedValueLen = new int[columnNumber];
+      allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
+
+      numberRows = WritableUtils.readVInt(in);
+      for (int i = 0; i < columnNumber; i++) {
+        eachColumnValueLen[i] = WritableUtils.readVInt(in);
+        eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
+        int bufLen = WritableUtils.readVInt(in);
+        if (allCellValLenBuffer[i] == null) {
+          allCellValLenBuffer[i] = new NonSyncDataOutputBuffer();
+        } else {
+          allCellValLenBuffer[i].reset();
+        }
+        allCellValLenBuffer[i].write(in, bufLen);
+      }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      // out.writeInt(numberRows);
+      WritableUtils.writeVLong(out, numberRows);
+      for (int i = 0; i < eachColumnValueLen.length; i++) {
+        WritableUtils.writeVLong(out, eachColumnValueLen[i]);
+        WritableUtils.writeVLong(out, eachColumnUncompressedValueLen[i]);
+        NonSyncDataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
+        int bufLen = colRowsLenBuf.getLength();
+        WritableUtils.writeVLong(out, bufLen);
+        out.write(colRowsLenBuf.getData(), 0, bufLen);
+      }
+    }
+
+    /**
+     * get number of bytes to store the keyBuffer.
+     *
+     * @return number of bytes used to store this KeyBuffer on disk
+     * @throws IOException
+     */
+    public int getSize() throws IOException {
+      int ret = 0;
+      ret += WritableUtils.getVIntSize(numberRows);
+      for (int i = 0; i < eachColumnValueLen.length; i++) {
+        ret += WritableUtils.getVIntSize(eachColumnValueLen[i]);
+        ret += WritableUtils.getVIntSize(eachColumnUncompressedValueLen[i]);
+        ret += WritableUtils.getVIntSize(allCellValLenBuffer[i].getLength());
+        ret += allCellValLenBuffer[i].getLength();
+      }
+
+      return ret;
+    }
+
+    @Override
+    public int compareTo(Object arg0) {
+      throw new RuntimeException("compareTo not supported in class "
+          + this.getClass().getName());
+    }
+
+    public int[] getEachColumnUncompressedValueLen() {
+      return eachColumnUncompressedValueLen;
+    }
+
+    public int[] getEachColumnValueLen() {
+      return eachColumnValueLen;
+    }
+
+    /**
+     * @return the numberRows
+     */
+    public int getNumberRows() {
+      return numberRows;
+    }
+  }
+
+  /**
+   * ValueBuffer is the value of each record in RCFile. Its on-disk layout is 
as
+   * below:
+   * <ul>
+   * <li>Compressed or plain data of [column_1_row_1_value,
+   * column_1_row_2_value,....]</li>
+   * <li>Compressed or plain data of [column_2_row_1_value,
+   * column_2_row_2_value,....]</li>
+   * </ul>
+   */
+  public static class ValueBuffer implements WritableComparable {
+
+    class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
+
+      int index = -1;
+      int colIndex = -1;
+
+      public LazyDecompressionCallbackImpl(int index, int colIndex) {
+        super();
+        this.index = index;
+        this.colIndex = colIndex;
+      }
+
+      @Override
+      public byte[] decompress() throws IOException {
+
+        if (decompressedFlag[index] || codec == null) {
+          return loadedColumnsValueBuffer[index].getData();
+        }
+
+        NonSyncDataOutputBuffer compressedData = 
compressedColumnsValueBuffer[index];
+        decompressBuffer.reset();
+        DataInputStream valueIn = new DataInputStream(deflatFilter);
+        deflatFilter.resetState();
+        if (deflatFilter instanceof SchemaAwareCompressionInputStream) {
+          
((SchemaAwareCompressionInputStream)deflatFilter).setColumnIndex(colIndex);
+        }
+        decompressBuffer.reset(compressedData.getData(),
+            keyBuffer.eachColumnValueLen[colIndex]);
+
+        NonSyncDataOutputBuffer decompressedColBuf = 
loadedColumnsValueBuffer[index];
+        decompressedColBuf.reset();
+        decompressedColBuf.write(valueIn,
+            keyBuffer.eachColumnUncompressedValueLen[colIndex]);
+        decompressedFlag[index] = true;
+        numCompressed--;
+        return decompressedColBuf.getData();
+      }
+    }
+
+    // used to load columns' value into memory
+    private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
+    private NonSyncDataOutputBuffer[] compressedColumnsValueBuffer = null;
+    private boolean[] decompressedFlag = null;
+    private int numCompressed;
+    private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
+    private boolean lazyDecompress = true;
+
+    boolean inited = false;
+
+    // used for readFields
+    KeyBuffer keyBuffer;
+    private int columnNumber = 0;
+
+    // set true for columns that needed to skip loading into memory.
+    boolean[] skippedColIDs = null;
+
+    CompressionCodec codec;
+
+    Decompressor valDecompressor = null;
+    NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
+    CompressionInputStream deflatFilter = null;
+
+    @SuppressWarnings("unused")
+    @Deprecated
+    public ValueBuffer() throws IOException {
+    }
+
+    @SuppressWarnings("unused")
+    @Deprecated
+    public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
+      this(keyBuffer, keyBuffer.columnNumber, null, null, true);
+    }
+
+    @SuppressWarnings("unused")
+    @Deprecated
+    public ValueBuffer(KeyBuffer keyBuffer, boolean[] skippedColIDs)
+        throws IOException {
+      this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null, true);
+    }
+
+    @SuppressWarnings("unused")
+    @Deprecated
+    public ValueBuffer(KeyBuffer currentKey, int columnNumber,
+        boolean[] skippedCols, CompressionCodec codec) throws IOException {
+      this(currentKey, columnNumber, skippedCols, codec, true);
+    }
+
+    public ValueBuffer(KeyBuffer currentKey, int columnNumber,
+      boolean[] skippedCols, CompressionCodec codec, boolean lazyDecompress)
+        throws IOException {
+      this.lazyDecompress = lazyDecompress;
+      keyBuffer = currentKey;
+      this.columnNumber = columnNumber;
+
+      if (skippedCols != null && skippedCols.length > 0) {
+        skippedColIDs = skippedCols;
+      } else {
+        skippedColIDs = new boolean[columnNumber];
+        for (int i = 0; i < skippedColIDs.length; i++) {
+          skippedColIDs[i] = false;
+        }
+      }
+
+      int skipped = 0;
+      for (boolean currentSkip : skippedColIDs) {
+        if (currentSkip) {
+          skipped++;
+        }
+      }
+      loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
+          - skipped];
+      decompressedFlag = new boolean[columnNumber - skipped];
+      lazyDecompressCallbackObjs = new 
LazyDecompressionCallbackImpl[columnNumber
+          - skipped];
+      compressedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
+                                                                 - skipped];
+      this.codec = codec;
+      if (codec != null) {
+        valDecompressor = CodecPool.getDecompressor(codec);
+        deflatFilter = codec.createInputStream(decompressBuffer,
+            valDecompressor);
+      }
+      if (codec != null) {
+        numCompressed = decompressedFlag.length;
+      } else {
+        numCompressed = 0;
+      }
+      for (int k = 0, readIndex = 0; k < columnNumber; k++) {
+        if (skippedColIDs[k]) {
+          continue;
+        }
+        loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
+        if (codec != null) {
+          decompressedFlag[readIndex] = false;
+          lazyDecompressCallbackObjs[readIndex] = new 
LazyDecompressionCallbackImpl(
+              readIndex, k);
+          compressedColumnsValueBuffer[readIndex] = new 
NonSyncDataOutputBuffer();
+        } else {
+          decompressedFlag[readIndex] = true;
+        }
+        readIndex++;
+      }
+    }
+
+    @SuppressWarnings("unused")
+    @Deprecated
+    public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer,
+        int addIndex) {
+      loadedColumnsValueBuffer[addIndex] = valBuffer;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int addIndex = 0;
+      int skipTotal = 0;
+      for (int i = 0; i < columnNumber; i++) {
+        int vaRowsLen = keyBuffer.eachColumnValueLen[i];
+        // skip this column
+        if (skippedColIDs[i]) {
+          skipTotal += vaRowsLen;
+          continue;
+        }
+
+        if (skipTotal != 0) {
+          in.skipBytes(skipTotal);
+          skipTotal = 0;
+        }
+
+        NonSyncDataOutputBuffer valBuf;
+        if (codec != null){
+           // load into compressed buf first
+          valBuf = compressedColumnsValueBuffer[addIndex];
+        } else {
+          valBuf = loadedColumnsValueBuffer[addIndex];
+        }
+        valBuf.reset();
+        valBuf.write(in, vaRowsLen);
+        if (codec != null) {
+          decompressedFlag[addIndex] = false;
+          if (!lazyDecompress) {
+            lazyDecompressCallbackObjs[addIndex].decompress();
+            decompressedFlag[addIndex] = true;
+          }
+        }
+        addIndex++;
+      }
+      if (codec != null) {
+        numCompressed = decompressedFlag.length;
+      }
+
+      if (skipTotal != 0) {
+        in.skipBytes(skipTotal);
+      }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      if (codec != null) {
+        for (NonSyncDataOutputBuffer currentBuf : 
compressedColumnsValueBuffer) {
+          out.write(currentBuf.getData(), 0, currentBuf.getLength());
+        }
+      } else {
+        for (NonSyncDataOutputBuffer currentBuf : loadedColumnsValueBuffer) {
+          out.write(currentBuf.getData(), 0, currentBuf.getLength());
+        }
+      }
+    }
+
+    public void nullColumn(int columnIndex) {
+      if (codec != null) {
+        compressedColumnsValueBuffer[columnIndex].reset();
+      } else {
+        loadedColumnsValueBuffer[columnIndex].reset();
+      }
+    }
+
+    public void clearColumnBuffer() throws IOException {
+      decompressBuffer.reset();
+    }
+
+    public void close() {
+      for (NonSyncDataOutputBuffer element : loadedColumnsValueBuffer) {
+        IOUtils.closeStream(element);
+      }
+      if (codec != null) {
+        IOUtils.closeStream(decompressBuffer);
+        if (valDecompressor != null) {
+          // Make sure we only return valDecompressor once.
+          CodecPool.returnDecompressor(valDecompressor);
+          valDecompressor = null;
+        }
+      }
+    }
+
+    @Override
+    public int compareTo(Object arg0) {
+      throw new RuntimeException("compareTo not supported in class "
+          + this.getClass().getName());
+    }
+  }
+
+  /**
+   * Create a metadata object with alternating key-value pairs.
+   * Eg. metadata(key1, value1, key2, value2)
+   */
+  public static Metadata createMetadata(Text... values) {
+    if (values.length % 2 != 0) {
+      throw new IllegalArgumentException("Must have a matched set of " +
+                                         "key-value pairs. " + values.length+
+                                         " strings supplied.");
+    }
+    Metadata result = new Metadata();
+    for(int i=0; i < values.length; i += 2) {
+      result.set(values[i], values[i+1]);
+    }
+    return result;
+  }
+
+  /**
+   * Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
+   * compatible with SequenceFile's.
+   *
+   */
+  public static class Writer {
+
+    Configuration conf;
+    FSDataOutputStream out;
+
+    CompressionCodec codec = null;
+    Metadata metadata = null;
+
+    // Insert a globally unique 16-byte value every few entries, so that one
+    // can seek into the middle of a file and then synchronize with record
+    // starts and ends by scanning for this value.
+    long lastSyncPos; // position of last sync
+    byte[] sync; // 16 random bytes
+    {
+      try {
+        MessageDigest digester = MessageDigest.getInstance("MD5");
+        long time = System.currentTimeMillis();
+        digester.update((new UID() + "@" + time).getBytes());
+        sync = digester.digest();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    // how many records the writer buffers before it writes to disk
+    private int RECORD_INTERVAL = Integer.MAX_VALUE;
+    // the max size of memory for buffering records before writes them out
+    private int columnsBufferSize = 4 * 1024 * 1024; // 4M
+    // the conf string for COLUMNS_BUFFER_SIZE
+    public static String COLUMNS_BUFFER_SIZE_CONF_STR = 
"hive.io.rcfile.record.buffer.size";
+
+    // how many records already buffered
+    private int bufferedRecords = 0;
+
+    private final ColumnBuffer[] columnBuffers;
+
+    private int columnNumber = 0;
+
+    private final int[] columnValuePlainLength;
+
+    KeyBuffer key = null;
+    private final int[] plainTotalColumnLength;
+    private final int[] comprTotalColumnLength;
+
+    boolean useNewMagic = true;
+
+    /*
+     * used for buffering appends before flush them out
+     */
+    class ColumnBuffer {
+      // used for buffer a column's values
+      NonSyncDataOutputBuffer columnValBuffer;
+      // used to store each value's length
+      NonSyncDataOutputBuffer valLenBuffer;
+
+      /*
+       * use a run-length encoding. We only record run length if a same
+       * 'prevValueLen' occurs more than one time. And we negative the run
+       * length to distinguish a runLength and a normal value length. For
+       * example, if the values' lengths are 1,1,1,2, we record 1, ~2,2. And 
for
+       * value lengths 1,2,3 we record 1,2,3.
+       */
+      int runLength = 0;
+      int prevValueLength = -1;
+
+      ColumnBuffer() throws IOException {
+        columnValBuffer = new NonSyncDataOutputBuffer();
+        valLenBuffer = new NonSyncDataOutputBuffer();
+      }
+
+      public void append(BytesRefWritable data) throws IOException {
+        data.writeDataTo(columnValBuffer);
+        int currentLen = data.getLength();
+
+        if (prevValueLength < 0) {
+          startNewGroup(currentLen);
+          return;
+        }
+
+        if (currentLen != prevValueLength) {
+          flushGroup();
+          startNewGroup(currentLen);
+        } else {
+          runLength++;
+        }
+      }
+
+      private void startNewGroup(int currentLen) {
+        prevValueLength = currentLen;
+        runLength = 0;
+      }
+
+      public void clear() throws IOException {
+        valLenBuffer.reset();
+        columnValBuffer.reset();
+        prevValueLength = -1;
+        runLength = 0;
+      }
+
+      public void flushGroup() throws IOException {
+        if (prevValueLength >= 0) {
+          WritableUtils.writeVLong(valLenBuffer, prevValueLength);
+          if (runLength > 0) {
+            WritableUtils.writeVLong(valLenBuffer, ~runLength);
+          }
+          runLength = -1;
+          prevValueLength = -1;
+        }
+      }
+    }
+
+    public long getLength() throws IOException {
+      return out.getPos();
+    }
+
+    /** Constructs a RCFile Writer. */
+    public Writer(FileSystem fs, Configuration conf, Path name) throws 
IOException {
+      this(fs, conf, name, null, new Metadata(), null);
+    }
+
+    /**
+     * Constructs a RCFile Writer.
+     *
+     * @param fs
+     *          the file system used
+     * @param conf
+     *          the configuration file
+     * @param name
+     *          the file name
+     * @throws IOException
+     */
+    public Writer(FileSystem fs, Configuration conf, Path name,
+        Progressable progress, CompressionCodec codec) throws IOException {
+      this(fs, conf, name, progress, new Metadata(), codec);
+    }
+
+    /**
+     * Constructs a RCFile Writer.
+     *
+     * @param fs
+     *          the file system used
+     * @param conf
+     *          the configuration file
+     * @param name
+     *          the file name
+     * @param progress a progress meter to update as the file is written
+     * @param metadata a string to string map in the file header
+     * @throws IOException
+     */
+    public Writer(FileSystem fs, Configuration conf, Path name,
+        Progressable progress, Metadata metadata, CompressionCodec codec) 
throws IOException {
+      this(fs, conf, name, fs.getConf().getInt("io.file.buffer.size", 4096),
+              ShimLoader.getHadoopShims().getDefaultReplication(fs, name),
+              ShimLoader.getHadoopShims().getDefaultBlockSize(fs, name), 
progress,
+          metadata, codec);
+    }
+
+    /**
+     *
+     * Constructs a RCFile Writer.
+     *
+     * @param fs
+     *          the file system used
+     * @param conf
+     *          the configuration file
+     * @param name
+     *          the file name
+     * @param bufferSize the size of the file buffer
+     * @param replication the number of replicas for the file
+     * @param blockSize the block size of the file
+     * @param progress the progress meter for writing the file
+     * @param metadata a string to string map in the file header
+     * @throws IOException
+     */
+    public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize,
+        short replication, long blockSize, Progressable progress,
+        Metadata metadata, CompressionCodec codec) throws IOException {
+      RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
+      columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);
+
+      if (metadata == null) {
+        metadata = new Metadata();
+      }
+      metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text(""
+          + columnNumber));
+
+      columnsBufferSize = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR,
+          4 * 1024 * 1024);
+
+      columnValuePlainLength = new int[columnNumber];
+
+      columnBuffers = new ColumnBuffer[columnNumber];
+      for (int i = 0; i < columnNumber; i++) {
+        columnBuffers[i] = new ColumnBuffer();
+      }
+
+      init(conf, fs.create(name, true, bufferSize, replication,
+        blockSize, progress), codec, metadata);
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+      key = new KeyBuffer(columnNumber);
+
+      plainTotalColumnLength = new int[columnNumber];
+      comprTotalColumnLength = new int[columnNumber];
+    }
+
+    /** Write the initial part of file header. */
+    void initializeFileHeader() throws IOException {
+      if (useNewMagic) {
+        out.write(MAGIC);
+        out.write(CURRENT_VERSION);
+      } else {
+        out.write(ORIGINAL_MAGIC_VERSION);
+      }
+    }
+
+    /** Write the final part of file header. */
+    void finalizeFileHeader() throws IOException {
+      out.write(sync); // write the sync bytes
+      out.flush(); // flush header
+    }
+
+    boolean isCompressed() {
+      return codec != null;
+    }
+
+    /** Write and flush the file header. */
+    void writeFileHeader() throws IOException {
+      if (useNewMagic) {
+        out.writeBoolean(isCompressed());
+      } else {
+        Text.writeString(out, KeyBuffer.class.getName());
+        Text.writeString(out, ValueBuffer.class.getName());
+        out.writeBoolean(isCompressed());
+        out.writeBoolean(false);
+      }
+
+      if (isCompressed()) {
+        Text.writeString(out, (codec.getClass()).getName());
+      }
+      metadata.write(out);
+    }
+
+    void init(Configuration conf, FSDataOutputStream out,
+        CompressionCodec codec, Metadata metadata) throws IOException {
+      this.conf = conf;
+      this.out = out;
+      this.codec = codec;
+      this.metadata = metadata;
+      this.useNewMagic =
+          
conf.getBoolean(HiveConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
+    }
+
+    /** Returns the compression codec of data in this file. */
+    @SuppressWarnings("unused")
+    @Deprecated
+    public CompressionCodec getCompressionCodec() {
+      return codec;
+    }
+
+    /** create a sync point. */
+    public void sync() throws IOException {
+      if (sync != null && lastSyncPos != out.getPos()) {
+        out.writeInt(SYNC_ESCAPE); // mark the start of the sync
+        out.write(sync); // write sync
+        lastSyncPos = out.getPos(); // update lastSyncPos
+      }
+    }
+
+    /** Returns the configuration of this file. */
+    @SuppressWarnings("unused")
+    @Deprecated
+    Configuration getConf() {
+      return conf;
+    }
+
+    private void checkAndWriteSync() throws IOException {
+      if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
+        sync();
+      }
+    }
+
+    private int columnBufferSize = 0;
+
+    /**
+     * Append a row of values. Currently it only can accept <
+     * {@link BytesRefArrayWritable}. If its <code>size()</code> is less than 
the
+     * column number in the file, zero bytes are appended for the empty 
columns.
+     * If its size() is greater then the column number in the file, the 
exceeded
+     * columns' bytes are ignored.
+     *
+     * @param val a BytesRefArrayWritable with the list of serialized columns
+     * @throws IOException
+     */
+    public void append(Writable val) throws IOException {
+
+      if (!(val instanceof BytesRefArrayWritable)) {
+        throw new UnsupportedOperationException(
+            "Currently the writer can only accept BytesRefArrayWritable");
+      }
+
+      BytesRefArrayWritable columns = (BytesRefArrayWritable) val;
+      int size = columns.size();
+      for (int i = 0; i < size; i++) {
+        BytesRefWritable cu = columns.get(i);
+        int plainLen = cu.getLength();
+        columnBufferSize += plainLen;
+        columnValuePlainLength[i] += plainLen;
+        columnBuffers[i].append(cu);
+      }
+
+      if (size < columnNumber) {
+        for (int i = columns.size(); i < columnNumber; i++) {
+          columnBuffers[i].append(BytesRefWritable.ZeroBytesRefWritable);
+        }
+      }
+
+      bufferedRecords++;
+      if ((columnBufferSize > columnsBufferSize)
+          || (bufferedRecords >= RECORD_INTERVAL)) {
+        flushRecords();
+      }
+    }
+
+    private void flushRecords() throws IOException {
+
+      key.numberRows = bufferedRecords;
+
+      Compressor compressor = null;
+      NonSyncDataOutputBuffer valueBuffer = null;
+      CompressionOutputStream deflateFilter = null;
+      DataOutputStream deflateOut = null;
+      boolean isCompressed = isCompressed();
+      int valueLength = 0;
+      if (isCompressed) {
+        ReflectionUtils.setConf(codec, this.conf);
+        compressor = CodecPool.getCompressor(codec);
+        valueBuffer = new NonSyncDataOutputBuffer();
+        deflateFilter = codec.createOutputStream(valueBuffer, compressor);
+        deflateOut = new DataOutputStream(deflateFilter);
+      }
+
+      for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
+        ColumnBuffer currentBuf = columnBuffers[columnIndex];
+        currentBuf.flushGroup();
+
+        NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+        int colLen;
+        int plainLen = columnValuePlainLength[columnIndex];
+
+        if (isCompressed) {
+          if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
+            ((SchemaAwareCompressionOutputStream)deflateFilter).
+              setColumnIndex(columnIndex);
+          }
+          deflateFilter.resetState();
+          deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
+          deflateOut.flush();
+          deflateFilter.finish();
+          // find how much compressed data was added for this column
+          colLen = valueBuffer.getLength() - valueLength;
+        } else {
+          colLen = columnValuePlainLength[columnIndex];
+        }
+        valueLength += colLen;
+        key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
+          columnIndex);
+        plainTotalColumnLength[columnIndex] += plainLen;
+        comprTotalColumnLength[columnIndex] += colLen;
+        columnValuePlainLength[columnIndex] = 0;
+      }
+
+      int keyLength = key.getSize();
+      if (keyLength < 0) {
+        throw new IOException("negative length keys not allowed: " + key);
+      }
+      if (compressor != null) {
+        CodecPool.returnCompressor(compressor);
+      }
+
+      // Write the key out
+      writeKey(key, keyLength + valueLength, keyLength);
+      // write the value out
+      if (isCompressed) {
+        out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
+      } else {
+        for(int columnIndex=0; columnIndex < columnNumber; ++columnIndex) {
+          NonSyncDataOutputBuffer buf =
+            columnBuffers[columnIndex].columnValBuffer;
+          out.write(buf.getData(), 0, buf.getLength());
+        }
+      }
+
+      // clear the columnBuffers
+      clearColumnBuffers();
+
+      bufferedRecords = 0;
+      columnBufferSize = 0;
+    }
+
+    /**
+     * flush a block out without doing anything except compressing the key 
part.
+     */
+    public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer,
+        int recordLen, int keyLength,
+        @SuppressWarnings("unused") int compressedKeyLen) throws IOException {
+      writeKey(keyBuffer, recordLen, keyLength);
+      valueBuffer.write(out);
+    }
+
+    private void writeKey(KeyBuffer keyBuffer, int recordLen,
+                          int keyLength) throws IOException {
+      checkAndWriteSync(); // sync
+      out.writeInt(recordLen); // total record length
+      out.writeInt(keyLength); // key portion length
+
+      if(this.isCompressed()) {
+        Compressor compressor = CodecPool.getCompressor(codec);
+        NonSyncDataOutputBuffer compressionBuffer =
+          new NonSyncDataOutputBuffer();
+        CompressionOutputStream deflateFilter =
+          codec.createOutputStream(compressionBuffer, compressor);
+        DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
+        //compress key and write key out
+        compressionBuffer.reset();
+        deflateFilter.resetState();
+        keyBuffer.write(deflateOut);
+        deflateOut.flush();
+        deflateFilter.finish();
+        int compressedKeyLen = compressionBuffer.getLength();
+        out.writeInt(compressedKeyLen);
+        out.write(compressionBuffer.getData(), 0, compressedKeyLen);
+        CodecPool.returnCompressor(compressor);
+      } else {
+        out.writeInt(keyLength);
+        keyBuffer.write(out);
+      }
+    }
+
+    private void clearColumnBuffers() throws IOException {
+      for (int i = 0; i < columnNumber; i++) {
+        columnBuffers[i].clear();
+      }
+    }
+
+    public synchronized void close() throws IOException {
+      if (bufferedRecords > 0) {
+        flushRecords();
+      }
+      clearColumnBuffers();
+
+      if (out != null) {
+
+        // Close the underlying stream if we own it...
+        out.flush();
+        out.close();
+        out = null;
+      }
+      for (int i = 0; i < columnNumber; i++) {
+        LOG.info("Column#" + i + " : Plain Total Column Value Length: "
+          + plainTotalColumnLength[i]
+          + ",  Compr Total Column Value Length: " + 
comprTotalColumnLength[i]);
+      }
+    }
+  }
+
+  /**
+   * Read KeyBuffer/ValueBuffer pairs from a RCFile.
+   *
+   */
+  public static class Reader {
+    private static class SelectedColumn {
+      public int colIndex;
+      public int rowReadIndex;
+      public int runLength;
+      public int prvLength;
+      public boolean isNulled;
+    }
+    private final Path file;
+    private final FSDataInputStream in;
+
+    private byte version;
+
+    private CompressionCodec codec = null;
+    private Metadata metadata = null;
+
+    private final byte[] sync = new byte[SYNC_HASH_SIZE];
+    private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+    private boolean syncSeen;
+    private long lastSeenSyncPos = 0;
+
+    private long headerEnd;
+    private final long end;
+    private int currentKeyLength;
+    private int currentRecordLength;
+
+    private final Configuration conf;
+
+    private final ValueBuffer currentValue;
+
+    private int readRowsIndexInBuffer = 0;
+
+    private int recordsNumInValBuffer = 0;
+
+    private int columnNumber = 0;
+
+    private int loadColumnNum;
+
+    private int passedRowsNum = 0;
+
+    // Should we try to tolerate corruption? Default is No.
+    private boolean tolerateCorruptions = false;
+
+    private boolean decompress = false;
+
+    private Decompressor keyDecompressor;
+    NonSyncDataOutputBuffer keyDecompressedData = new 
NonSyncDataOutputBuffer();
+
+    //Current state of each selected column - e.g. current run length, etc.
+    // The size of the array is equal to the number of selected columns
+    private final SelectedColumn[] selectedColumns;
+
+    // map of original column id -> index among selected columns
+    private final int[] revPrjColIDs;
+
+    // column value lengths for each of the selected columns
+    private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
+
+    /** Create a new RCFile reader. */
+    public Reader(FileSystem fs, Path file, Configuration conf) throws 
IOException {
+      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, 0, fs
+          .getFileStatus(file).getLen());
+    }
+
+    /** Create a new RCFile reader. */
+    public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
+        long start, long length) throws IOException {
+      tolerateCorruptions = conf.getBoolean(
+        TOLERATE_CORRUPTIONS_CONF_STR, false);
+      conf.setInt("io.file.buffer.size", bufferSize);
+      this.file = file;
+      in = openFile(fs, file, bufferSize, length);
+      this.conf = conf;
+      end = start + length;
+      boolean succeed = false;
+      try {
+        if (start > 0) {
+          seek(0);
+          init();
+          seek(start);
+        } else {
+          init();
+        }
+        succeed = true;
+      } finally {
+        if (!succeed) {
+          if (in != null) {
+            try {
+              in.close();
+            } catch(IOException e) {
+              if (LOG != null && LOG.isDebugEnabled()) {
+                LOG.debug("Exception in closing " + in, e);
+              }
+            }
+          }
+        }
+      }
+
+      columnNumber = Integer.parseInt(metadata.get(
+          new Text(COLUMN_NUMBER_METADATA_STR)).toString());
+
+      List<Integer> notSkipIDs = ColumnProjectionUtils
+          .getReadColumnIDs(conf);
+      boolean[] skippedColIDs = new boolean[columnNumber];
+      if(ColumnProjectionUtils.isReadAllColumns(conf)) {
+        Arrays.fill(skippedColIDs, false);
+      } else if (notSkipIDs.size() > 0) {
+        Arrays.fill(skippedColIDs, true);
+        for (int read : notSkipIDs) {
+          if (read < columnNumber) {
+            skippedColIDs[read] = false;
+          }
+        }
+      } else {
+        // select count(1)
+        Arrays.fill(skippedColIDs, true);
+      }
+      loadColumnNum = columnNumber;
+      if (skippedColIDs.length > 0) {
+        for (boolean skippedColID : skippedColIDs) {
+          if (skippedColID) {
+            loadColumnNum -= 1;
+          }
+        }
+      }
+
+
+      revPrjColIDs = new int[columnNumber];
+      // get list of selected column IDs
+      selectedColumns = new SelectedColumn[loadColumnNum];
+      colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum];
+      for (int i = 0, j = 0; i < columnNumber; ++i) {
+        if (!skippedColIDs[i]) {
+          SelectedColumn col = new SelectedColumn();
+          col.colIndex = i;
+          col.runLength = 0;
+          col.prvLength = -1;
+          col.rowReadIndex = 0;
+          selectedColumns[j] = col;
+          colValLenBufferReadIn[j] = new NonSyncDataInputBuffer();
+          revPrjColIDs[i] = j;
+          j++;
+        } else {
+          revPrjColIDs[i] = -1;
+        }
+      }
+
+      currentKey = createKeyBuffer();
+      boolean lazyDecompress = !tolerateCorruptions;
+      currentValue = new ValueBuffer(
+        null, columnNumber, skippedColIDs, codec, lazyDecompress);
+    }
+
+    /**
+     * Return the metadata (Text to Text map) that was written into the
+     * file.
+     */
+    public Metadata getMetadata() {
+      return metadata;
+    }
+
+    /**
+     * Return the metadata value associated with the given key.
+     * @param key the metadata key to retrieve
+     */
+    public Text getMetadataValueOf(Text key) {
+      return metadata.get(key);
+    }
+
+    /**
+     * Override this method to specialize the type of
+     * {@link FSDataInputStream} returned.
+     */
+    protected FSDataInputStream openFile(FileSystem fs, Path file,
+        int bufferSize, long length) throws IOException {
+      return fs.open(file, bufferSize);
+    }
+
+    private void init() throws IOException {
+      byte[] magic = new byte[MAGIC.length];
+      in.readFully(magic);
+
+      if (Arrays.equals(magic, ORIGINAL_MAGIC)) {
+        byte vers = in.readByte();
+        if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
+          throw new IOException(file + " is a version " + vers +
+                                " SequenceFile instead of an RCFile.");
+        }
+        version = ORIGINAL_VERSION;
+      } else {
+        if (!Arrays.equals(magic, MAGIC)) {
+          throw new IOException(file + " not a RCFile and has magic of " +
+                                new String(magic));
+        }
+
+        // Set 'version'
+        version = in.readByte();
+        if (version > CURRENT_VERSION) {
+          throw new VersionMismatchException((byte) CURRENT_VERSION, version);
+        }
+      }
+
+      if (version == ORIGINAL_VERSION) {
+        try {
+          Class<?> keyCls = conf.getClassByName(Text.readString(in));
+          Class<?> valCls = conf.getClassByName(Text.readString(in));
+          if (!keyCls.equals(KeyBuffer.class)
+              || !valCls.equals(ValueBuffer.class)) {
+            throw new IOException(file + " not a RCFile");
+          }
+        } catch (ClassNotFoundException e) {
+          throw new IOException(file + " not a RCFile", e);
+        }
+      }
+
+      decompress = in.readBoolean(); // is compressed?
+
+      if (version == ORIGINAL_VERSION) {
+        // is block-compressed? it should be always false.
+        boolean blkCompressed = in.readBoolean();
+        if (blkCompressed) {
+          throw new IOException(file + " not a RCFile.");
+        }
+      }
+
+      // setup the compression codec
+      if (decompress) {
+        String codecClassname = Text.readString(in);
+        try {
+          Class<? extends CompressionCodec> codecClass = conf.getClassByName(
+              codecClassname).asSubclass(CompressionCodec.class);
+          codec = ReflectionUtils.newInstance(codecClass, conf);
+        } catch (ClassNotFoundException cnfe) {
+          throw new IllegalArgumentException(
+              "Unknown codec: " + codecClassname, cnfe);
+        }
+        keyDecompressor = CodecPool.getDecompressor(codec);
+      }
+
+      metadata = new Metadata();
+      metadata.readFields(in);
+
+      in.readFully(sync); // read sync bytes
+      headerEnd = in.getPos();
+    }
+
+    /** Return the current byte position in the input file. */
+    public synchronized long getPosition() throws IOException {
+      return in.getPos();
+    }
+
+    /**
+     * Set the current byte position in the input file.
+     *
+     * <p>
+     * The position passed must be a position returned by
+     * {@link RCFile.Writer#getLength()} when writing this file. To seek to an
+     * arbitrary position, use {@link RCFile.Reader#sync(long)}. In another
+     * words, the current seek can only seek to the end of the file. For other
+     * positions, use {@link RCFile.Reader#sync(long)}.
+     */
+    public synchronized void seek(long position) throws IOException {
+      in.seek(position);
+    }
+
+    /**
+     * Resets the values which determine if there are more rows in the buffer
+     *
+     * This can be used after one calls seek or sync, if one called next 
before that.
+     * Otherwise, the seek or sync will have no effect, it will continue to 
get rows from the
+     * buffer built up from the call to next.
+     */
+    public synchronized void resetBuffer() {
+      readRowsIndexInBuffer = 0;
+      recordsNumInValBuffer = 0;
+    }
+
+    /** Seek to the next sync mark past a given position. */
+    public synchronized void sync(long position) throws IOException {
+      if (position + SYNC_SIZE >= end) {
+        seek(end);
+        return;
+      }
+
+      //this is to handle syn(pos) where pos < headerEnd.
+      if (position < headerEnd) {
+        // seek directly to first record
+        in.seek(headerEnd);
+        // note the sync marker "seen" in the header
+        syncSeen = true;
+        return;
+      }
+
+      try {
+        seek(position + 4); // skip escape
+
+        int prefix = sync.length;
+        int n = conf.getInt("io.bytes.per.checksum", 512);
+        byte[] buffer = new byte[prefix+n];
+        n = (int)Math.min(n, end - in.getPos());
+        /* fill array with a pattern that will never match sync */
+        Arrays.fill(buffer, (byte)(~sync[0]));
+        while(n > 0 && (in.getPos() + n) <= end) {
+          position = in.getPos();
+          in.readFully(buffer, prefix, n);
+          /* the buffer has n+sync bytes */
+          for(int i = 0; i < n; i++) {
+            int j;
+            for(j = 0; j < sync.length && sync[j] == buffer[i+j]; j++) {
+              /* nothing */
+            }
+            if(j == sync.length) {
+              /* simplified from (position + (i - prefix) + sync.length) - 
SYNC_SIZE */
+              in.seek(position + i - SYNC_SIZE);
+              return;
+            }
+          }
+          /* move the last 16 bytes to the prefix area */
+          System.arraycopy(buffer, buffer.length - prefix, buffer, 0, prefix);
+          n = (int)Math.min(n, end - in.getPos());
+        }
+      } catch (ChecksumException e) { // checksum failure
+        handleChecksumException(e);
+      }
+    }
+
+    private void handleChecksumException(ChecksumException e) throws 
IOException {
+      if (conf.getBoolean("io.skip.checksum.errors", false)) {
+        LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
+        sync(getPosition() + conf.getInt("io.bytes.per.checksum", 512));
+      } else {
+        throw e;
+      }
+    }
+
+    private KeyBuffer createKeyBuffer() {
+      return new KeyBuffer(columnNumber);
+    }
+
+    /**
+     * Read and return the next record length, potentially skipping over a sync
+     * block.
+     *
+     * @return the length of the next record or -1 if there is no next record
+     * @throws IOException
+     */
+    private synchronized int readRecordLength() throws IOException {
+      if (in.getPos() >= end) {
+        return -1;
+      }
+      int length = in.readInt();
+      if (sync != null && length == SYNC_ESCAPE) { // process
+        // a
+        // sync entry
+        lastSeenSyncPos = in.getPos() - 4; // minus SYNC_ESCAPE's length
+        in.readFully(syncCheck); // read syncCheck
+        if (!Arrays.equals(sync, syncCheck)) {
+          throw new IOException("File is corrupt!");
+        }
+        syncSeen = true;
+        if (in.getPos() >= end) {
+          return -1;
+        }
+        length = in.readInt(); // re-read length
+      } else {
+        syncSeen = false;
+      }
+      return length;
+    }
+
+    private void seekToNextKeyBuffer() throws IOException {
+      if (!keyInit) {
+        return;
+      }
+      if (!currentValue.inited) {
+        in.skip(currentRecordLength - currentKeyLength);
+      }
+    }
+
+    private int compressedKeyLen = 0;
+    NonSyncDataInputBuffer keyDataIn = new NonSyncDataInputBuffer();
+    NonSyncDataInputBuffer keyDecompressBuffer = new NonSyncDataInputBuffer();
+    NonSyncDataOutputBuffer keyTempBuffer = new NonSyncDataOutputBuffer();
+
+    KeyBuffer currentKey = null;
+    boolean keyInit = false;
+
+    protected int nextKeyBuffer() throws IOException {
+      seekToNextKeyBuffer();
+      currentRecordLength = readRecordLength();
+      if (currentRecordLength == -1) {
+        keyInit = false;
+        return -1;
+      }
+      currentKeyLength = in.readInt();
+      compressedKeyLen = in.readInt();
+      if (decompress) {
+        keyTempBuffer.reset();
+        keyTempBuffer.write(in, compressedKeyLen);
+        keyDecompressBuffer.reset(keyTempBuffer.getData(), compressedKeyLen);
+        CompressionInputStream deflatFilter = codec.createInputStream(
+            keyDecompressBuffer, keyDecompressor);
+        DataInputStream compressedIn = new DataInputStream(deflatFilter);
+        deflatFilter.resetState();
+        keyDecompressedData.reset();
+        keyDecompressedData.write(compressedIn, currentKeyLength);
+        keyDataIn.reset(keyDecompressedData.getData(), currentKeyLength);
+        currentKey.readFields(keyDataIn);
+      } else {
+        currentKey.readFields(in);
+      }
+
+      keyInit = true;
+      currentValue.inited = false;
+
+      readRowsIndexInBuffer = 0;
+      recordsNumInValBuffer = currentKey.numberRows;
+
+      for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
+        SelectedColumn col = selectedColumns[selIx];
+        int colIx = col.colIndex;
+        NonSyncDataOutputBuffer buf = currentKey.allCellValLenBuffer[colIx];
+        colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength());
+        col.rowReadIndex = 0;
+        col.runLength = 0;
+        col.prvLength = -1;
+        col.isNulled = colValLenBufferReadIn[selIx].getLength() == 0;
+      }
+
+      return currentKeyLength;
+    }
+
+    protected void currentValueBuffer() throws IOException {
+      if (!keyInit) {
+        nextKeyBuffer();
+      }
+      currentValue.keyBuffer = currentKey;
+      currentValue.clearColumnBuffer();
+      currentValue.readFields(in);
+      currentValue.inited = true;
+    }
+
+    public boolean nextBlock() throws IOException {
+      int keyLength = nextKeyBuffer();
+      if(keyLength > 0) {
+        currentValueBuffer();
+        return true;
+      }
+      return false;
+    }
+
+    private boolean rowFetched = false;
+
+    // use this buffer to hold column's cells value length for usages in
+    // getColumn(), instead of using colValLenBufferReadIn directly.
+    private final NonSyncDataInputBuffer fetchColumnTempBuf = new 
NonSyncDataInputBuffer();
+
+    /**
+     * Fetch all data in the buffer for a given column. This is useful for
+     * columnar operators, which perform operations on an array data of one
+     * column. It should be used together with {@link #nextColumnsBatch()}.
+     * Calling getColumn() with not change the result of
+     * {@link #next(LongWritable)} and
+     * {@link #getCurrentRow(BytesRefArrayWritable)}.
+     *
+     * @param columnID the number of the column to get 0 to N-1
+     * @throws IOException
+     */
+    public BytesRefArrayWritable getColumn(int columnID,
+        BytesRefArrayWritable rest) throws IOException {
+      int selColIdx = revPrjColIDs[columnID];
+      if (selColIdx == -1) {
+        return null;
+      }
+
+      if (rest == null) {
+        rest = new BytesRefArrayWritable();
+      }
+
+      rest.resetValid(recordsNumInValBuffer);
+
+      if (!currentValue.inited) {
+        currentValueBuffer();
+      }
+
+      int columnNextRowStart = 0;
+      fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
+          .getData(), currentKey.allCellValLenBuffer[columnID].getLength());
+      SelectedColumn selCol = selectedColumns[selColIdx];
+      byte[] uncompData = null;
+      ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
+      boolean decompressed = currentValue.decompressedFlag[selColIdx];
+      if (decompressed) {
+        uncompData =
+              currentValue.loadedColumnsValueBuffer[selColIdx].getData();
+      } else {
+        decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
+      }
+      for (int i = 0; i < recordsNumInValBuffer; i++) {
+        colAdvanceRow(selColIdx, selCol);
+        int length = selCol.prvLength;
+
+        BytesRefWritable currentCell = rest.get(i);
+
+        if (decompressed) {
+          currentCell.set(uncompData, columnNextRowStart, length);
+        } else {
+          currentCell.set(decompCallBack, columnNextRowStart, length);
+        }
+        columnNextRowStart = columnNextRowStart + length;
+      }
+      return rest;
+    }
+
+    /**
+     * Read in next key buffer and throw any data in current key buffer and
+     * current value buffer. It will influence the result of
+     * {@link #next(LongWritable)} and
+     * {@link #getCurrentRow(BytesRefArrayWritable)}
+     *
+     * @return whether there still has records or not
+     * @throws IOException
+     */
+    @SuppressWarnings("unused")
+    @Deprecated
+    public synchronized boolean nextColumnsBatch() throws IOException {
+      passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
+      return nextKeyBuffer() > 0;
+    }
+
+    /**
+     * Returns how many rows we fetched with next(). It only means how many 
rows
+     * are read by next(). The returned result may be smaller than actual 
number
+     * of rows passed by, because {@link #seek(long)},
+     * {@link #nextColumnsBatch()} can change the underlying key buffer and
+     * value buffer.
+     *
+     * @return next row number
+     * @throws IOException
+     */
+    public synchronized boolean next(LongWritable readRows) throws IOException 
{
+      if (hasRecordsInBuffer()) {
+        readRows.set(passedRowsNum);
+        readRowsIndexInBuffer++;
+        passedRowsNum++;
+        rowFetched = false;
+        return true;
+      } else {
+        keyInit = false;
+      }
+
+      int ret = -1;
+      if (tolerateCorruptions) {
+        ret = nextKeyValueTolerateCorruptions();
+      } else {
+        try {
+          ret = nextKeyBuffer();
+        } catch (EOFException eof) {
+          eof.printStackTrace();
+        }
+      }
+      return (ret > 0) && next(readRows);
+    }
+
+    private int nextKeyValueTolerateCorruptions() throws IOException {
+      long currentOffset = in.getPos();
+      int ret;
+      try {
+        ret = nextKeyBuffer();
+        this.currentValueBuffer();
+      } catch (IOException ioe) {
+        // A BlockMissingException indicates a temporary error,
+        // not a corruption. Re-throw this exception.
+        String msg = ioe.getMessage();
+        if (msg != null && msg.startsWith(BLOCK_MISSING_MESSAGE)) {
+          LOG.warn("Re-throwing block-missing exception" + ioe);
+          throw ioe;
+        }
+        // We have an IOException other than a BlockMissingException.
+        LOG.warn("Ignoring IOException in file " + file +
+                 " after offset " + currentOffset, ioe);
+        ret = -1;
+      } catch (Throwable t) {
+        // We got an exception that is not IOException
+        // (typically OOM, IndexOutOfBounds, InternalError).
+        // This is most likely a corruption.
+        LOG.warn("Ignoring unknown error in " + file +
+                 " after offset " + currentOffset, t);
+        ret = -1;
+      }
+      return ret;
+    }
+
+    public boolean hasRecordsInBuffer() {
+      return readRowsIndexInBuffer < recordsNumInValBuffer;
+    }
+
+    /**
+     * get the current row used,make sure called {@link #next(LongWritable)}
+     * first.
+     *
+     * @throws IOException
+     */
+    public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws 
IOException {
+
+      if (!keyInit || rowFetched) {
+        return;
+      }
+
+      if (tolerateCorruptions) {
+        if (!currentValue.inited) {
+          currentValueBuffer();
+        }
+        ret.resetValid(columnNumber);
+      } else {
+        if (!currentValue.inited) {
+          currentValueBuffer();
+          // do this only when not initialized, but we may need to find a way 
to
+          // tell the caller how to initialize the valid size
+          ret.resetValid(columnNumber);
+        }
+      }
+
+      // we do not use BytesWritable here to avoid the byte-copy from
+      // DataOutputStream to BytesWritable
+      if (currentValue.numCompressed > 0) {
+        for (int j = 0; j < selectedColumns.length; ++j) {
+          SelectedColumn col = selectedColumns[j];
+          int i = col.colIndex;
+
+          if (col.isNulled) {
+            ret.set(i, null);
+          } else {
+            BytesRefWritable ref = ret.unCheckedGet(i);
+
+            colAdvanceRow(j, col);
+
+            if (currentValue.decompressedFlag[j]) {
+              ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+                  col.rowReadIndex, col.prvLength);
+            } else {
+              ref.set(currentValue.lazyDecompressCallbackObjs[j],
+                  col.rowReadIndex, col.prvLength);
+            }
+            col.rowReadIndex += col.prvLength;
+          }
+        }
+      } else {
+        // This version of the loop eliminates a condition check and branch
+        // and is measurably faster (20% or so)
+        for (int j = 0; j < selectedColumns.length; ++j) {
+          SelectedColumn col = selectedColumns[j];
+          int i = col.colIndex;
+
+          if (col.isNulled) {
+            ret.set(i, null);
+          } else {
+            BytesRefWritable ref = ret.unCheckedGet(i);
+
+            colAdvanceRow(j, col);
+            ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+                  col.rowReadIndex, col.prvLength);
+            col.rowReadIndex += col.prvLength;
+          }
+        }
+      }
+      rowFetched = true;
+    }
+
+    /**
+     * Advance column state to the next now: update offsets, run lengths etc
+     * @param selCol - index among selectedColumns
+     * @param col - column object to update the state of.  prvLength will be
+     *        set to the new read position
+     * @throws IOException
+     */
+    private void colAdvanceRow(int selCol, SelectedColumn col) throws 
IOException {
+      if (col.runLength > 0) {
+        --col.runLength;
+      } else {
+        int length = (int) 
WritableUtils.readVLong(colValLenBufferReadIn[selCol]);
+        if (length < 0) {
+          // we reach a runlength here, use the previous length and reset
+          // runlength
+          col.runLength = (~length) - 1;
+        } else {
+          col.prvLength = length;
+          col.runLength = 0;
+        }
+      }
+    }
+
+    /** Returns true iff the previous call to next passed a sync mark. */
+    @SuppressWarnings("unused")
+    public boolean syncSeen() {
+      return syncSeen;
+    }
+
+    /** Returns the last seen sync position. */
+    public long lastSeenSyncPos() {
+      return lastSeenSyncPos;
+    }
+
+    /** Returns the name of the file. */
+    @Override
+    public String toString() {
+      return file.toString();
+    }
+
+    @SuppressWarnings("unused")
+    public boolean isCompressedRCFile() {
+      return this.decompress;
+    }
+
+    /** Close the reader. */
+    public void close() {
+      IOUtils.closeStream(in);
+      currentValue.close();
+      if (decompress) {
+        IOUtils.closeStream(keyDecompressedData);
+        if (keyDecompressor != null) {
+          // Make sure we only return keyDecompressor once.
+          CodecPool.returnDecompressor(keyDecompressor);
+          keyDecompressor = null;
+        }
+      }
+    }
+
+    /**
+     * return the KeyBuffer object used in the reader. Internally in each
+     * reader, there is only one KeyBuffer object, which gets reused for every
+     * block.
+     */
+    public KeyBuffer getCurrentKeyBufferObj() {
+      return this.currentKey;
+    }
+
+    /**
+     * return the ValueBuffer object used in the reader. Internally in each
+     * reader, there is only one ValueBuffer object, which gets reused for 
every
+     * block.
+     */
+    public ValueBuffer getCurrentValueBufferObj() {
+      return this.currentValue;
+    }
+
+    //return the current block's length
+    public int getCurrentBlockLength() {
+      return this.currentRecordLength;
+    }
+
+    //return the current block's key length
+    public int getCurrentKeyLength() {
+      return this.currentKeyLength;
+    }
+
+    //return the current block's compressed key length
+    public int getCurrentCompressedKeyLen() {
+      return this.compressedKeyLen;
+    }
+
+    //return the CompressionCodec used for this file
+    public CompressionCodec getCompressionCodec() {
+      return this.codec;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileInputFormat.java
 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileInputFormat.java
new file mode 100644
index 0000000..19a8f8a
--- /dev/null
+++ 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileInputFormat.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * RCFileInputFormat.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class RCFileInputFormat<K extends LongWritable, V extends 
BytesRefArrayWritable>
+    extends FileInputFormat<K, V> implements InputFormatChecker {
+
+  public RCFileInputFormat() {
+    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+
+    reporter.setStatus(split.toString());
+
+    return new RCFileRecordReader(job, (FileSplit) split);
+  }
+
+  @Override
+  public boolean validateInput(FileSystem fs, HiveConf conf,
+      ArrayList<FileStatus> files) throws IOException {
+    if (files.size() <= 0) {
+      return false;
+    }
+    for (int fileId = 0; fileId < files.size(); fileId++) {
+      RCFile.Reader reader = null;
+      try {
+        reader = new RCFile.Reader(fs, files.get(fileId)
+            .getPath(), conf);
+        reader.close();
+        reader = null;
+      } catch (IOException e) {
+        return false;
+      } finally {
+        if (null != reader) {
+          reader.close();
+        }
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileRecordReader.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileRecordReader.java
 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileRecordReader.java
new file mode 100644
index 0000000..6ddba27
--- /dev/null
+++ 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/RCFileRecordReader.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.asterix.hivecompat.io.RCFile.KeyBuffer;
+import org.apache.asterix.hivecompat.io.RCFile.Reader;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * RCFileRecordReader.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class RCFileRecordReader<K extends LongWritable, V extends 
BytesRefArrayWritable>
+    implements RecordReader<LongWritable, BytesRefArrayWritable> {
+
+  private final Reader in;
+  private final long start;
+  private final long end;
+  private boolean more = true;
+  protected Configuration conf;
+  private final FileSplit split;
+  private final boolean useCache;
+
+  private static RCFileSyncCache syncCache = new RCFileSyncCache();
+
+  private static final class RCFileSyncEntry {
+    long end;
+    long endSync;
+  }
+
+  private static final class RCFileSyncCache {
+
+    private final Map<String, RCFileSyncEntry> cache;
+
+    public RCFileSyncCache() {
+       cache = Collections.synchronizedMap(new WeakHashMap<String, 
RCFileSyncEntry>());
+    }
+
+    public void put(FileSplit split, long endSync) {
+      Path path = split.getPath();
+      long end = split.getStart() + split.getLength();
+      String key = path.toString()+"+"+String.format("%d",end);
+
+      RCFileSyncEntry entry = new RCFileSyncEntry();
+      entry.end = end;
+      entry.endSync = endSync;
+      if(entry.endSync >= entry.end) {
+        cache.put(key, entry);
+      }
+    }
+
+    public long get(FileSplit split) {
+      Path path = split.getPath();
+      long start = split.getStart();
+      String key = path.toString()+"+"+String.format("%d",start);
+      RCFileSyncEntry entry = cache.get(key);
+      if(entry != null) {
+        return entry.endSync;
+      }
+      return -1;
+    }
+  }
+
+  public RCFileRecordReader(Configuration conf, FileSplit split)
+      throws IOException {
+
+    Path path = split.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+    this.in = new RCFile.Reader(fs, path, conf);
+    this.end = split.getStart() + split.getLength();
+    this.conf = conf;
+    this.split = split;
+
+    useCache = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE);
+
+    if (split.getStart() > in.getPosition()) {
+      long oldSync = useCache ? syncCache.get(split) : -1;
+      if(oldSync == -1) {
+        in.sync(split.getStart()); // sync to start
+      } else {
+        in.seek(oldSync);
+      }
+    }
+
+    this.start = in.getPosition();
+
+    more = start < end;
+  }
+
+  public Class<?> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  public Class<?> getValueClass() {
+    return BytesRefArrayWritable.class;
+  }
+
+  public LongWritable createKey() {
+    return (LongWritable) ReflectionUtils.newInstance(getKeyClass(), conf);
+  }
+
+  public BytesRefArrayWritable createValue() {
+    return (BytesRefArrayWritable) ReflectionUtils.newInstance(getValueClass(),
+        conf);
+  }
+
+  public boolean nextBlock() throws IOException {
+    return in.nextBlock();
+  }
+
+  @Override
+  public boolean next(LongWritable key, BytesRefArrayWritable value)
+      throws IOException {
+
+    more = next(key);
+
+    if (more) {
+      in.getCurrentRow(value);
+    }
+    return more;
+  }
+
+  protected boolean next(LongWritable key) throws IOException {
+    if (!more) {
+      return false;
+    }
+
+    more = in.next(key);
+
+    long lastSeenSyncPos = in.lastSeenSyncPos();
+
+    if (lastSeenSyncPos >= end) {
+      if(useCache) {
+        syncCache.put(split, lastSeenSyncPos);
+      }
+      more = false;
+      return more;
+    }
+    return more;
+  }
+
+  /**
+   * Return the progress within the input split.
+   *
+   * @return 0.0 to 1.0 of the input byte range
+   */
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (in.getPosition() - start) / (float) (end - 
start));
+    }
+  }
+
+  public long getPos() throws IOException {
+    return in.getPosition();
+  }
+
+  public KeyBuffer getKeyBuffer() {
+    return in.getCurrentKeyBufferObj();
+  }
+
+  protected void seek(long pos) throws IOException {
+    in.seek(pos);
+  }
+
+  public void sync(long pos) throws IOException {
+    in.sync(pos);
+  }
+
+  public void resetBuffer() {
+    in.resetBuffer();
+  }
+
+  public long getStart() {
+    return start;
+  }
+
+  public void close() throws IOException {
+    in.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionInputStream.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionInputStream.java
 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionInputStream.java
new file mode 100644
index 0000000..f1eb555
--- /dev/null
+++ 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionInputStream.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.io.compress.*;
+/**
+ *
+ * SchemaAwareCompressionInputStream adds the ability to inform the compression
+ * stream what column is being read.
+ *
+ */
+public abstract class SchemaAwareCompressionInputStream extends 
CompressionInputStream {
+
+  protected SchemaAwareCompressionInputStream(InputStream in) throws 
java.io.IOException {
+    super(in);
+  }
+
+  /**
+   * The column being read
+   *
+   * @param columnIndex the index of the column. Use -1 for non-column data
+   */
+  public abstract void setColumnIndex(int columnIndex);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionOutputStream.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionOutputStream.java
 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionOutputStream.java
new file mode 100644
index 0000000..68f95a3
--- /dev/null
+++ 
b/asterixdb/asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/SchemaAwareCompressionOutputStream.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.hivecompat.io;
+
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.compress.*;
+
+/**
+ *
+ * SchemaAwareCompressionOutputStream adds the ability to inform the 
comression stream
+ * the current column being compressed.
+ *
+ */
+public abstract class SchemaAwareCompressionOutputStream extends 
CompressionOutputStream {
+
+  protected SchemaAwareCompressionOutputStream(OutputStream out) {
+    super(out);
+  }
+
+  /**
+   *
+   * The column being output
+   *
+   * @param columnIndex the index of the column. Use -1 for non-column data
+   */
+  public abstract void setColumnIndex(int columnIndex);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/asterix-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/pom.xml b/asterixdb/asterix-yarn/pom.xml
index 4e6ce2a..ed476ca 100644
--- a/asterixdb/asterix-yarn/pom.xml
+++ b/asterixdb/asterix-yarn/pom.xml
@@ -153,8 +153,8 @@
                 <include>org.apache.httpcomponents:httpclient</include>
                 <include>org.htrace:htrace-core</include>
                 <include>commons-httpclient:commons-httpclient</include>
-                <include>com.google.guava:guava</include>
                 <include>com.google.protobuf:protobuf-java</include>
+                <include>com.google.guava:guava</include>
               </includes>
               <location>lib</location>
             </dependencySet>
@@ -201,7 +201,6 @@
             <ignoredDependency>org.apache.hive:hive-exec:*</ignoredDependency>
           </ignoredDependencies>
           <usedDependencies>
-            <usedDependency>com.google.guava:guava</usedDependency>
             <usedDependency>commons-codec:commons-codec</usedDependency>
             
<usedDependency>commons-collections:commons-collections</usedDependency>
             
<usedDependency>commons-configuration:commons-configuration</usedDependency>
@@ -459,7 +458,6 @@
     <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging-api</artifactId>
-      <version>1.0.4</version>
     </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5226ca87/asterixdb/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index a0f9633..043f801 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -601,6 +601,7 @@
     <module>asterix-runtime</module>
     <module>asterix-om</module>
     <module>asterix-external-data</module>
+    <module>asterix-hivecompat</module>
     <module>asterix-examples</module>
     <module>asterix-metadata</module>
     <module>asterix-test-framework</module>
@@ -681,6 +682,12 @@
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>${hadoop.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
@@ -716,6 +723,44 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>org.apache.hive</groupId>
+        <artifactId>hive-serde</artifactId>
+        <version>0.13.0</version>
+        <exclusions>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>commons-logging</groupId>
+        <artifactId>commons-logging-api</artifactId>
+        <version>1.0.4</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hive.shims</groupId>
+        <artifactId>hive-shims-common</artifactId>
+        <version>0.13.0</version>
+        <exclusions>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hive</groupId>
+        <artifactId>hive-common</artifactId>
+        <version>0.13.0</version>
+        <exclusions>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hyracks</groupId>
         <artifactId>algebricks-common</artifactId>
         <version>${algebricks.version}</version>

Reply via email to