Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.0 c13b0562f -> 5e5c1af08


TAJO-1826: Revert "refine code for Parquet 1.8.1."

This reverts commit 18b898ffbab0462ae26c0cf374b119dcee5f1c6f.

Signed-off-by: Hyunsik Choi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5e5c1af0
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5e5c1af0
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5e5c1af0

Branch: refs/heads/branch-0.11.0
Commit: 5e5c1af080090bc65d937d3608546fe2b991a1b7
Parents: c13b056
Author: Jongyoung Park <[email protected]>
Authored: Wed Sep 9 18:38:06 2015 +0900
Committer: Hyunsik Choi <[email protected]>
Committed: Wed Sep 9 18:45:11 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 tajo-storage/tajo-storage-hdfs/pom.xml          |  16 +-
 .../tajo/storage/parquet/ParquetAppender.java   |   6 +-
 .../tajo/storage/parquet/TajoParquetReader.java |   4 +-
 .../tajo/storage/parquet/TajoParquetWriter.java |   4 +-
 .../tajo/storage/parquet/TajoReadSupport.java   |  10 +-
 .../storage/parquet/TajoRecordConverter.java    |  12 +-
 .../storage/parquet/TajoRecordMaterializer.java |   6 +-
 .../storage/parquet/TajoSchemaConverter.java    |  10 +-
 .../tajo/storage/parquet/TajoWriteSupport.java  |  12 +-
 .../thirdparty/parquet/CodecFactory.java        | 190 +++++++
 .../parquet/ColumnChunkPageWriteStore.java      | 206 ++++++++
 .../parquet/InternalParquetRecordReader.java    | 190 +++++++
 .../parquet/InternalParquetRecordWriter.java    | 160 ++++++
 .../thirdparty/parquet/ParquetFileWriter.java   | 492 +++++++++++++++++++
 .../thirdparty/parquet/ParquetReader.java       | 146 ++++++
 .../thirdparty/parquet/ParquetWriter.java       | 224 +++++++++
 .../storage/parquet/TestSchemaConverter.java    |   4 +-
 18 files changed, 1659 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c7679c8..9bb33bd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -594,6 +594,9 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1826: Revert 'refining code for Parquet 1.8.1'. 
+    (Contributed by Jongyoung Park, committed by hyunsik)
+
     TAJO-1641: Add window function documentation. (jihoon)
 
     TAJO-1749: Refine JDBC exceptions to better handle exceptional 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml 
b/tajo-storage/tajo-storage-hdfs/pom.xml
index 0150f02..d47b001 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -34,6 +34,8 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <parquet.version>1.5.0</parquet.version>
+    <parquet.format.version>2.1.0</parquet.format.version>
   </properties>
 
   <repositories>
@@ -335,9 +337,19 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.parquet</groupId>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-column</artifactId>
+      <version>${parquet.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>1.8.1</version>
+      <version>${parquet.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-format</artifactId>
+      <version>${parquet.format.version}</version>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index 6cb99d1..41e4269 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -21,8 +21,8 @@ package org.apache.tajo.storage.parquet;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -128,7 +128,7 @@ public class ParquetAppender extends FileAppender {
   }
 
   public long getEstimatedOutputSize() throws IOException {
-    return writer.getDataSize();
+    return writer.getEstimatedWrittenSize();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
index 1a6545f..a765f48 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
@@ -19,10 +19,10 @@
 package org.apache.tajo.storage.parquet;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.storage.Tuple;
-import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.tajo.storage.thirdparty.parquet.ParquetReader;
+import parquet.filter.UnboundRecordFilter;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
index 8e6ae3e..5f220c5 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
@@ -19,10 +19,10 @@
 package org.apache.tajo.storage.parquet;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.storage.Tuple;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter;
+import parquet.hadoop.metadata.CompressionCodecName;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
index 4a3300c..a64e987 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
@@ -21,11 +21,11 @@ package org.apache.tajo.storage.parquet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.storage.Tuple;
-import org.apache.parquet.Log;
-import org.apache.parquet.hadoop.api.InitContext;
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.schema.MessageType;
+import parquet.Log;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 43c55e1..7f236b6 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -27,12 +27,12 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.Type;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.GroupType;
+import parquet.schema.Type;
 
 import java.nio.ByteBuffer;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
index f762820..436159c 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
@@ -21,9 +21,9 @@ package org.apache.tajo.storage.parquet;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.storage.Tuple;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.schema.MessageType;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
 
 /**
  * Materializes a Tajo Tuple from a stream of Parquet data.

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
index e0cf64b..555b623 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
@@ -21,11 +21,11 @@ package org.apache.tajo.storage.parquet;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.apache.parquet.schema.Type;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
 
 import java.util.ArrayList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index 9613a25..de2a1e3 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -24,12 +24,12 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
 import org.apache.tajo.storage.Tuple;
-import org.apache.parquet.hadoop.api.WriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
 
 import java.util.HashMap;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
new file mode 100644
index 0000000..4ba47c1
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import parquet.bytes.BytesInput;
+import parquet.hadoop.BadConfigurationException;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+class CodecFactory {
+
+  public static class BytesDecompressor {
+
+    private final CompressionCodec codec;
+    private final Decompressor decompressor;
+
+    public BytesDecompressor(CompressionCodec codec) {
+      this.codec = codec;
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+      } else {
+        decompressor = null;
+      }
+    }
+
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
+      final BytesInput decompressed;
+      if (codec != null) {
+        decompressor.reset();
+        InputStream is = codec.createInputStream(new 
ByteArrayInputStream(bytes.toByteArray()), decompressor);
+        decompressed = BytesInput.from(is, uncompressedSize);
+      } else {
+        decompressed = bytes;
+      }
+      return decompressed;
+    }
+
+    private void release() {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
+    }
+  }
+
+  /**
+   * Encapsulates the logic around hadoop compression
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static class BytesCompressor {
+
+    private final CompressionCodec codec;
+    private final Compressor compressor;
+    private final ByteArrayOutputStream compressedOutBuffer;
+    private final CompressionCodecName codecName;
+
+    public BytesCompressor(CompressionCodecName codecName, CompressionCodec 
codec, int pageSize) {
+      this.codecName = codecName;
+      this.codec = codec;
+      if (codec != null) {
+        this.compressor = CodecPool.getCompressor(codec);
+        this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
+      } else {
+        this.compressor = null;
+        this.compressedOutBuffer = null;
+      }
+    }
+
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      final BytesInput compressedBytes;
+      if (codec == null) {
+        compressedBytes = bytes;
+      } else {
+        compressedOutBuffer.reset();
+        if (compressor != null) {
+          // null compressor for non-native gzip
+          compressor.reset();
+        }
+        CompressionOutputStream cos = 
codec.createOutputStream(compressedOutBuffer, compressor);
+        bytes.writeAllTo(cos);
+        cos.finish();
+        cos.close();
+        compressedBytes = BytesInput.from(compressedOutBuffer);
+      }
+      return compressedBytes;
+    }
+
+    private void release() {
+      if (compressor != null) {
+        CodecPool.returnCompressor(compressor);
+      }
+    }
+
+    public CompressionCodecName getCodecName() {
+      return codecName;
+    }
+
+  }
+
+  private final Map<CompressionCodecName, BytesCompressor> compressors = new 
HashMap<CompressionCodecName, BytesCompressor>();
+  private final Map<CompressionCodecName, BytesDecompressor> decompressors = 
new HashMap<CompressionCodecName, BytesDecompressor>();
+  private final Map<String, CompressionCodec> codecByName = new 
HashMap<String, CompressionCodec>();
+  private final Configuration configuration;
+
+  public CodecFactory(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  /**
+   *
+   * @param codecName the requested codec
+   * @return the corresponding hadoop codec. null if UNCOMPRESSED
+   */
+  private CompressionCodec getCodec(CompressionCodecName codecName) {
+    String codecClassName = codecName.getHadoopCompressionCodecClassName();
+    if (codecClassName == null) {
+      return null;
+    }
+    CompressionCodec codec = codecByName.get(codecClassName);
+    if (codec != null) {
+      return codec;
+    }
+
+    try {
+      Class<?> codecClass = Class.forName(codecClassName);
+      codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, 
configuration);
+      codecByName.put(codecClassName, codec);
+      return codec;
+    } catch (ClassNotFoundException e) {
+      throw new BadConfigurationException("Class " + codecClassName + " was 
not found", e);
+    }
+  }
+
+  public BytesCompressor getCompressor(CompressionCodecName codecName, int 
pageSize) {
+    BytesCompressor comp = compressors.get(codecName);
+    if (comp == null) {
+      CompressionCodec codec = getCodec(codecName);
+      comp = new BytesCompressor(codecName, codec, pageSize);
+      compressors.put(codecName, comp);
+    }
+    return comp;
+  }
+
+  public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
+    BytesDecompressor decomp = decompressors.get(codecName);
+    if (decomp == null) {
+      CompressionCodec codec = getCodec(codecName);
+      decomp = new BytesDecompressor(codec);
+      decompressors.put(codecName, decomp);
+    }
+    return decomp;
+  }
+
+  public void release() {
+    for (BytesCompressor compressor : compressors.values()) {
+      compressor.release();
+    }
+    compressors.clear();
+    for (BytesDecompressor decompressor : decompressors.values()) {
+      decompressor.release();
+    }
+    decompressors.clear();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..91d4748
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import parquet.Log;
+import parquet.bytes.BytesInput;
+import parquet.bytes.CapacityByteArrayOutputStream;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.PageWriteStore;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.BooleanStatistics;
+import parquet.column.statistics.Statistics;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.io.ParquetEncodingException;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.*;
+
+import static 
org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
+import static parquet.Log.DEBUG;
+
+class ColumnChunkPageWriteStore implements PageWriteStore {
+  private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
+
+  private static ParquetMetadataConverter parquetMetadataConverter = new 
ParquetMetadataConverter();
+
+  private static final class ColumnChunkPageWriter implements PageWriter {
+
+    private final ColumnDescriptor path;
+    private final BytesCompressor compressor;
+
+    private final CapacityByteArrayOutputStream buf;
+    private DictionaryPage dictionaryPage;
+
+    private long uncompressedLength;
+    private long compressedLength;
+    private long totalValueCount;
+    private int pageCount;
+
+    private Set<Encoding> encodings = new HashSet<Encoding>();
+
+    private Statistics totalStatistics;
+
+    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor 
compressor, int initialSize) {
+      this.path = path;
+      this.compressor = compressor;
+      this.buf = new CapacityByteArrayOutputStream(initialSize);
+      this.totalStatistics = 
Statistics.getStatsBasedOnType(this.path.getType());
+    }
+
+    @Deprecated
+    @Override
+    public void writePage(BytesInput bytes,
+                          int valueCount,
+                          Encoding rlEncoding,
+                          Encoding dlEncoding,
+                          Encoding valuesEncoding) throws IOException {
+      long uncompressedSize = bytes.size();
+      BytesInput compressedBytes = compressor.compress(bytes);
+      long compressedSize = compressedBytes.size();
+      BooleanStatistics statistics = new BooleanStatistics(); // dummy stats 
object
+      parquetMetadataConverter.writeDataPageHeader(
+          (int)uncompressedSize,
+          (int)compressedSize,
+          valueCount,
+          statistics,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          buf);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      compressedBytes.writeAllTo(buf);
+      encodings.add(rlEncoding);
+      encodings.add(dlEncoding);
+      encodings.add(valuesEncoding);
+    }
+
+    @Override
+    public void writePage(BytesInput bytes,
+                          int valueCount,
+                          Statistics statistics,
+                          Encoding rlEncoding,
+                          Encoding dlEncoding,
+                          Encoding valuesEncoding) throws IOException {
+      long uncompressedSize = bytes.size();
+      BytesInput compressedBytes = compressor.compress(bytes);
+      long compressedSize = compressedBytes.size();
+      parquetMetadataConverter.writeDataPageHeader(
+          (int)uncompressedSize,
+          (int)compressedSize,
+          valueCount,
+          statistics,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          buf);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
+      compressedBytes.writeAllTo(buf);
+      encodings.add(rlEncoding);
+      encodings.add(dlEncoding);
+      encodings.add(valuesEncoding);
+    }
+
+    @Override
+    public long getMemSize() {
+      return buf.size();
+    }
+
+    public void writeToFileWriter(ParquetFileWriter writer) throws IOException 
{
+      writer.startColumn(path, totalValueCount, compressor.getCodecName());
+      if (dictionaryPage != null) {
+        writer.writeDictionaryPage(dictionaryPage);
+        encodings.add(dictionaryPage.getEncoding());
+      }
+      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, 
compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
+      writer.endColumn();
+      if (DEBUG) {
+        LOG.debug(
+            String.format(
+                "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d 
pages, encodings: %s",
+                buf.size(), path, totalValueCount, uncompressedLength, 
compressedLength, pageCount, encodings)
+                + (dictionaryPage != null ? String.format(
+                ", dic { %,d entries, %,dB raw, %,dB comp}",
+                dictionaryPage.getDictionarySize(), 
dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+                : ""));
+      }
+      encodings.clear();
+      pageCount = 0;
+    }
+
+    @Override
+    public long allocatedSize() {
+      return buf.getCapacity();
+    }
+
+    @Override
+    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws 
IOException {
+      if (this.dictionaryPage != null) {
+        throw new ParquetEncodingException("Only one dictionary page is 
allowed");
+      }
+      BytesInput dictionaryBytes = dictionaryPage.getBytes();
+      int uncompressedSize = (int)dictionaryBytes.size();
+      BytesInput compressedBytes = compressor.compress(dictionaryBytes);
+      this.dictionaryPage = new 
DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, 
dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+    }
+
+    @Override
+    public String memUsageString(String prefix) {
+      return buf.memUsageString(prefix + " ColumnChunkPageWriter");
+    }
+  }
+
+  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new 
HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
+  private final MessageType schema;
+  private final BytesCompressor compressor;
+  private final int initialSize;
+
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType 
schema, int initialSize) {
+    this.compressor = compressor;
+    this.schema = schema;
+    this.initialSize = initialSize;
+  }
+
+  @Override
+  public PageWriter getPageWriter(ColumnDescriptor path) {
+    if (!writers.containsKey(path)) {
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, 
initialSize));
+    }
+    return writers.get(path);
+  }
+
+  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
+    List<ColumnDescriptor> columns = schema.getColumns();
+    for (ColumnDescriptor columnDescriptor : columns) {
+      ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
+      pageWriter.writeToFileWriter(writer);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
new file mode 100644
index 0000000..10ac6de
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.Log;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.PageReadStore;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.util.counters.BenchmarkCounter;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static parquet.Log.DEBUG;
+
+class InternalParquetRecordReader<T> {
+  private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
+
+  private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+
+  private MessageType requestedSchema;
+  private MessageType fileSchema;
+  private int columnCount;
+  private final ReadSupport<T> readSupport;
+
+  private RecordMaterializer<T> recordConverter;
+
+  private T currentValue;
+  private long total;
+  private int current = 0;
+  private int currentBlock = -1;
+  private ParquetFileReader reader;
+  private parquet.io.RecordReader<T> recordReader;
+  private UnboundRecordFilter recordFilter;
+
+  private long totalTimeSpentReadingBytes;
+  private long totalTimeSpentProcessingRecords;
+  private long startedAssemblingCurrentBlockAt;
+
+  private long totalCountLoadedSoFar = 0;
+
+  private Path file;
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. 
Thrift, Avro.
+   */
+  public InternalParquetRecordReader(ReadSupport<T> readSupport) {
+    this(readSupport, null);
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. 
Thrift, Avro.
+   * @param filter Optional filter for only returning matching records.
+   */
+  public InternalParquetRecordReader(ReadSupport<T> readSupport, 
UnboundRecordFilter
+      filter) {
+    this.readSupport = readSupport;
+    this.recordFilter = filter;
+  }
+
+  private void checkRead() throws IOException {
+    if (current == totalCountLoadedSoFar) {
+      if (current != 0) {
+        long timeAssembling = System.currentTimeMillis() - 
startedAssemblingCurrentBlockAt;
+        totalTimeSpentProcessingRecords += timeAssembling;
+        if (DEBUG) LOG.debug("Assembled and processed " + 
totalCountLoadedSoFar + " records from " + columnCount + " columns in " + 
totalTimeSpentProcessingRecords + " ms: " + ((float) totalCountLoadedSoFar / 
totalTimeSpentProcessingRecords) + " rec/ms, " + ((float) totalCountLoadedSoFar 
* columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+        long totalTime = totalTimeSpentProcessingRecords + 
totalTimeSpentReadingBytes;
+        long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
+        long percentProcessing = 100 * totalTimeSpentProcessingRecords / 
totalTime;
+        if (DEBUG) LOG.debug("time spent so far " + percentReading + "% 
reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% 
processing ("+totalTimeSpentProcessingRecords+" ms)");
+      }
+
+      if (DEBUG) LOG.debug("at row " + current + ". reading next block");
+      long t0 = System.currentTimeMillis();
+      PageReadStore pages = reader.readNextRowGroup();
+      if (pages == null) {
+        throw new IOException("expecting more rows but reached last block. 
Read " + current + " out of " + total);
+      }
+      long timeSpentReading = System.currentTimeMillis() - t0;
+      totalTimeSpentReadingBytes += timeSpentReading;
+      BenchmarkCounter.incrementTime(timeSpentReading);
+      if (DEBUG) {
+        LOG.debug("block read in memory in " + timeSpentReading + " ms. row 
count = " + pages.getRowCount());
+        LOG.debug("initializing Record assembly with requested schema " + 
requestedSchema);
+      }
+      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, 
fileSchema);
+      recordReader = columnIO.getRecordReader(pages, recordConverter, 
recordFilter);
+      startedAssemblingCurrentBlockAt = System.currentTimeMillis();
+      totalCountLoadedSoFar += pages.getRowCount();
+      ++ currentBlock;
+    }
+  }
+
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  public T getCurrentValue() throws IOException,
+      InterruptedException {
+    return currentValue;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return (float) current / total;
+  }
+
+  public void initialize(MessageType requestedSchema, MessageType fileSchema,
+                         Map<String, String> extraMetadata, Map<String, 
String> readSupportMetadata,
+                         Path file, List<BlockMetaData> blocks, Configuration 
configuration)
+      throws IOException {
+    this.requestedSchema = requestedSchema;
+    this.fileSchema = fileSchema;
+    this.file = file;
+    this.columnCount = this.requestedSchema.getPaths().size();
+    this.recordConverter = readSupport.prepareForRead(
+        configuration, extraMetadata, fileSchema,
+        new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
+
+    List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    reader = new ParquetFileReader(configuration, file, blocks, columns);
+    for (BlockMetaData block : blocks) {
+      total += block.getRowCount();
+    }
+    if (DEBUG) LOG.debug("RecordReader initialized will read a total of " + 
total + " records.");
+  }
+
+  private boolean contains(GroupType group, String[] path, int index) {
+    if (index == path.length) {
+      return false;
+    }
+    if (group.containsField(path[index])) {
+      Type type = group.getType(path[index]);
+      if (type.isPrimitive()) {
+        return index + 1 == path.length;
+      } else {
+        return contains(type.asGroupType(), path, index + 1);
+      }
+    }
+    return false;
+  }
+
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (current < total) {
+      try {
+        checkRead();
+        currentValue = recordReader.read();
+        if (DEBUG) LOG.debug("read value: " + currentValue);
+        current ++;
+      } catch (RuntimeException e) {
+        throw new ParquetDecodingException(format("Can not read value at %d in 
block %d in file %s", current, currentBlock, file), e);
+      }
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
new file mode 100644
index 0000000..da57745
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import parquet.Log;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.lang.String.format;
+import static 
org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
+import static parquet.Log.DEBUG;
+import static parquet.Preconditions.checkNotNull;
+
+class InternalParquetRecordWriter<T> {
+  private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
+
+  private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
+  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+
+  private final ParquetFileWriter w;
+  private final WriteSupport<T> writeSupport;
+  private final MessageType schema;
+  private final Map<String, String> extraMetaData;
+  private final int blockSize;
+  private final int pageSize;
+  private final BytesCompressor compressor;
+  private final int dictionaryPageSize;
+  private final boolean enableDictionary;
+  private final boolean validating;
+  private final WriterVersion writerVersion;
+
+  private long recordCount = 0;
+  private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+
+  private ColumnWriteStoreImpl store;
+  private ColumnChunkPageWriteStore pageStore;
+
+  /**
+   * @param w the file to write to
+   * @param writeSupport the class to convert incoming records
+   * @param schema the schema of the records
+   * @param extraMetaData extra meta data to write in the footer of the file
+   * @param blockSize the size of a block in the file (this will be 
approximate)
+   * @param codec the codec used to compress
+   */
+  public InternalParquetRecordWriter(
+      ParquetFileWriter w,
+      WriteSupport<T> writeSupport,
+      MessageType schema,
+      Map<String, String> extraMetaData,
+      int blockSize,
+      int pageSize,
+      BytesCompressor compressor,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion) {
+    this.w = w;
+    this.writeSupport = checkNotNull(writeSupport, "writeSupport");
+    this.schema = schema;
+    this.extraMetaData = extraMetaData;
+    this.blockSize = blockSize;
+    this.pageSize = pageSize;
+    this.compressor = compressor;
+    this.dictionaryPageSize = dictionaryPageSize;
+    this.enableDictionary = enableDictionary;
+    this.validating = validating;
+    this.writerVersion = writerVersion;
+    initStore();
+  }
+
+  private void initStore() {
+    // we don't want this number to be too small
+    // ideally we divide the block equally across the columns
+    // it is unlikely all columns are going to be the same size.
+    int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / 
schema.getColumns().size() / 5);
+    pageStore = new ColumnChunkPageWriteStore(compressor, schema, 
initialBlockBufferSize);
+    // we don't want this number to be too small either
+    // ideally, slightly bigger than the page size, but not bigger than the 
block buffer
+    int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + 
pageSize / 10, initialBlockBufferSize));
+    store = new ColumnWriteStoreImpl(pageStore, pageSize, 
initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
+    MessageColumnIO columnIO = new 
ColumnIOFactory(validating).getColumnIO(schema);
+    writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
+  }
+
+  public void close() throws IOException, InterruptedException {
+    flushStore();
+    w.end(extraMetaData);
+  }
+
+  public void write(T value) throws IOException, InterruptedException {
+    writeSupport.write(value);
+    ++ recordCount;
+    checkBlockSizeReached();
+  }
+
+  private void checkBlockSizeReached() throws IOException {
+    if (recordCount >= recordCountForNextMemCheck) { // checking the memory 
size is relatively expensive, so let's not do it for every record.
+      long memSize = store.memSize();
+      if (memSize > blockSize) {
+        if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records 
to disk.", memSize, blockSize, recordCount));
+        flushStore();
+        initStore();
+        recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, 
recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
+      } else {
+        float recordSize = (float) memSize / recordCount;
+        recordCountForNextMemCheck = min(
+            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + 
(long)(blockSize / recordSize)) / 2), // will check halfway
+            recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more 
than max records ahead
+        );
+        if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: 
%,d ", recordCount, recordCountForNextMemCheck));
+      }
+    }
+  }
+
+  public long getEstimatedWrittenSize() throws IOException {
+    return w.getPos() + store.memSize();
+  }
+
+  private void flushStore()
+      throws IOException {
+    if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: 
%,d", store.allocatedSize()));
+    if (store.allocatedSize() > 3 * blockSize) {
+      LOG.warn("Too much memory used: " + store.memUsageString());
+    }
+    w.startBlock(recordCount);
+    store.flush();
+    pageStore.flushToFileWriter(w);
+    recordCount = 0;
+    w.endBlock();
+    store = null;
+    pageStore = null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
new file mode 100644
index 0000000..ac1c421
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.Log;
+import parquet.Version;
+import parquet.bytes.BytesInput;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.DictionaryPage;
+import parquet.column.statistics.Statistics;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.metadata.*;
+import parquet.io.ParquetEncodingException;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.Map.Entry;
+
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.writeFileMetaData;
+
+/**
+ * Internal implementation of the Parquet file writer as a block container
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetFileWriter {
+  private static final Log LOG = Log.getLog(ParquetFileWriter.class);
+
+  public static final String PARQUET_METADATA_FILE = "_metadata";
+  public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
+  public static final int CURRENT_VERSION = 1;
+
+  private static final ParquetMetadataConverter metadataConverter = new 
ParquetMetadataConverter();
+
+  private final MessageType schema;
+  private final FSDataOutputStream out;
+  private BlockMetaData currentBlock;
+  private ColumnChunkMetaData currentColumn;
+  private long currentRecordCount;
+  private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+  private long uncompressedLength;
+  private long compressedLength;
+  private Set<parquet.column.Encoding> currentEncodings;
+
+  private CompressionCodecName currentChunkCodec;
+  private ColumnPath currentChunkPath;
+  private PrimitiveTypeName currentChunkType;
+  private long currentChunkFirstDataPage;
+  private long currentChunkDictionaryPageOffset;
+  private long currentChunkValueCount;
+
+  private Statistics currentStatistics;
+
+  /**
+   * Captures the order in which methods should be called
+   *
+   * @author Julien Le Dem
+   *
+   */
+  private enum STATE {
+    NOT_STARTED {
+      STATE start() {
+        return STARTED;
+      }
+    },
+    STARTED {
+      STATE startBlock() {
+        return BLOCK;
+      }
+      STATE end() {
+        return ENDED;
+      }
+    },
+    BLOCK  {
+      STATE startColumn() {
+        return COLUMN;
+      }
+      STATE endBlock() {
+        return STARTED;
+      }
+    },
+    COLUMN {
+      STATE endColumn() {
+        return BLOCK;
+      };
+      STATE write() {
+        return this;
+      }
+    },
+    ENDED;
+
+    STATE start() throws IOException { return error(); }
+    STATE startBlock() throws IOException { return error(); }
+    STATE startColumn() throws IOException { return error(); }
+    STATE write() throws IOException { return error(); }
+    STATE endColumn() throws IOException { return error(); }
+    STATE endBlock() throws IOException { return error(); }
+    STATE end() throws IOException { return error(); }
+
+    private final STATE error() throws IOException {
+      throw new IOException("The file being written is in an invalid state. 
Probably caused by an error thrown previously. Current state: " + this.name());
+    }
+  }
+
+  private STATE state = STATE.NOT_STARTED;
+
+  /**
+   *
+   * @param configuration Configuration
+   * @param schema the schema of the data
+   * @param file the file to write to
+   * @throws java.io.IOException if the file can not be created
+   */
+  public ParquetFileWriter(Configuration configuration, MessageType schema, 
Path file) throws IOException {
+    super();
+    this.schema = schema;
+    FileSystem fs = file.getFileSystem(configuration);
+    this.out = fs.create(file, false);
+  }
+
+  /**
+   * start the file
+   * @throws java.io.IOException
+   */
+  public void start() throws IOException {
+    state = state.start();
+    if (DEBUG) LOG.debug(out.getPos() + ": start");
+    out.write(MAGIC);
+  }
+
+  /**
+   * start a block
+   * @param recordCount the record count in this block
+   * @throws java.io.IOException
+   */
+  public void startBlock(long recordCount) throws IOException {
+    state = state.startBlock();
+    if (DEBUG) LOG.debug(out.getPos() + ": start block");
+//    out.write(MAGIC); // TODO: add a magic delimiter
+    currentBlock = new BlockMetaData();
+    currentRecordCount = recordCount;
+  }
+
+  /**
+   * start a column inside a block
+   * @param descriptor the column descriptor
+   * @param valueCount the value count in this column
+   * @param statistics the statistics in this column
+   * @param compressionCodecName
+   * @throws java.io.IOException
+   */
+  public void startColumn(ColumnDescriptor descriptor,
+                          long valueCount,
+                          CompressionCodecName compressionCodecName) throws 
IOException {
+    state = state.startColumn();
+    if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " 
count=" + valueCount);
+    currentEncodings = new HashSet<parquet.column.Encoding>();
+    currentChunkPath = ColumnPath.get(descriptor.getPath());
+    currentChunkType = descriptor.getType();
+    currentChunkCodec = compressionCodecName;
+    currentChunkValueCount = valueCount;
+    currentChunkFirstDataPage = out.getPos();
+    compressedLength = 0;
+    uncompressedLength = 0;
+    // need to know what type of stats to initialize to
+    // better way to do this?
+    currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
+  }
+
+  /**
+   * writes a dictionary page page
+   * @param dictionaryPage the dictionary page
+   */
+  public void writeDictionaryPage(DictionaryPage dictionaryPage) throws 
IOException {
+    state = state.write();
+    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + 
dictionaryPage.getDictionarySize() + " values");
+    currentChunkDictionaryPageOffset = out.getPos();
+    int uncompressedSize = dictionaryPage.getUncompressedSize();
+    int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: 
fix casts
+    metadataConverter.writeDictionaryPageHeader(
+        uncompressedSize,
+        compressedPageSize,
+        dictionaryPage.getDictionarySize(),
+        dictionaryPage.getEncoding(),
+        out);
+    long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+    this.uncompressedLength += uncompressedSize + headerSize;
+    this.compressedLength += compressedPageSize + headerSize;
+    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + 
compressedPageSize);
+    dictionaryPage.getBytes().writeAllTo(out);
+    currentEncodings.add(dictionaryPage.getEncoding());
+  }
+
+
+  /**
+   * writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   */
+  @Deprecated
+  public void writeDataPage(
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      parquet.column.Encoding rlEncoding,
+      parquet.column.Encoding dlEncoding,
+      parquet.column.Encoding valuesEncoding) throws IOException {
+    state = state.write();
+    long beforeHeader = out.getPos();
+    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " 
values");
+    int compressedPageSize = (int)bytes.size();
+    metadataConverter.writeDataPageHeader(
+        uncompressedPageSize, compressedPageSize,
+        valueCount,
+        rlEncoding,
+        dlEncoding,
+        valuesEncoding,
+        out);
+    long headerSize = out.getPos() - beforeHeader;
+    this.uncompressedLength += uncompressedPageSize + headerSize;
+    this.compressedLength += compressedPageSize + headerSize;
+    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + 
compressedPageSize);
+    bytes.writeAllTo(out);
+    currentEncodings.add(rlEncoding);
+    currentEncodings.add(dlEncoding);
+    currentEncodings.add(valuesEncoding);
+  }
+
+  /**
+   * writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   */
+  public void writeDataPage(
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      Statistics statistics,
+      parquet.column.Encoding rlEncoding,
+      parquet.column.Encoding dlEncoding,
+      parquet.column.Encoding valuesEncoding) throws IOException {
+    state = state.write();
+    long beforeHeader = out.getPos();
+    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " 
values");
+    int compressedPageSize = (int)bytes.size();
+    metadataConverter.writeDataPageHeader(
+        uncompressedPageSize, compressedPageSize,
+        valueCount,
+        statistics,
+        rlEncoding,
+        dlEncoding,
+        valuesEncoding,
+        out);
+    long headerSize = out.getPos() - beforeHeader;
+    this.uncompressedLength += uncompressedPageSize + headerSize;
+    this.compressedLength += compressedPageSize + headerSize;
+    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + 
compressedPageSize);
+    bytes.writeAllTo(out);
+    currentStatistics.mergeStatistics(statistics);
+    currentEncodings.add(rlEncoding);
+    currentEncodings.add(dlEncoding);
+    currentEncodings.add(valuesEncoding);
+  }
+
+  /**
+   * writes a number of pages at once
+   * @param bytes bytes to be written including page headers
+   * @param uncompressedTotalPageSize total uncompressed size (without page 
headers)
+   * @param compressedTotalPageSize total compressed size (without page 
headers)
+   * @throws java.io.IOException
+   */
+  void writeDataPages(BytesInput bytes,
+                      long uncompressedTotalPageSize,
+                      long compressedTotalPageSize,
+                      Statistics totalStats,
+                      List<parquet.column.Encoding> encodings) throws 
IOException {
+    state = state.write();
+    if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
+    long headersSize = bytes.size() - compressedTotalPageSize;
+    this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+    this.compressedLength += compressedTotalPageSize + headersSize;
+    if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
+    bytes.writeAllTo(out);
+    currentEncodings.addAll(encodings);
+    currentStatistics = totalStats;
+  }
+
+  /**
+   * end a column (once all rep, def and data have been written)
+   * @throws java.io.IOException
+   */
+  public void endColumn() throws IOException {
+    state = state.endColumn();
+    if (DEBUG) LOG.debug(out.getPos() + ": end column");
+    currentBlock.addColumn(ColumnChunkMetaData.get(
+        currentChunkPath,
+        currentChunkType,
+        currentChunkCodec,
+        currentEncodings,
+        currentStatistics,
+        currentChunkFirstDataPage,
+        currentChunkDictionaryPageOffset,
+        currentChunkValueCount,
+        compressedLength,
+        uncompressedLength));
+    if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
+    currentColumn = null;
+    this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + 
uncompressedLength);
+    this.uncompressedLength = 0;
+    this.compressedLength = 0;
+  }
+
+  /**
+   * ends a block once all column chunks have been written
+   * @throws java.io.IOException
+   */
+  public void endBlock() throws IOException {
+    state = state.endBlock();
+    if (DEBUG) LOG.debug(out.getPos() + ": end block");
+    currentBlock.setRowCount(currentRecordCount);
+    blocks.add(currentBlock);
+    currentBlock = null;
+  }
+
+  /**
+   * ends a file once all blocks have been written.
+   * closes the file.
+   * @param extraMetaData the extra meta data to write in the footer
+   * @throws java.io.IOException
+   */
+  public void end(Map<String, String> extraMetaData) throws IOException {
+    state = state.end();
+    if (DEBUG) LOG.debug(out.getPos() + ": end");
+    ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, 
extraMetaData, Version.FULL_VERSION), blocks);
+    serializeFooter(footer, out);
+    out.close();
+  }
+
+  private static void serializeFooter(ParquetMetadata footer, 
FSDataOutputStream out) throws IOException {
+    long footerIndex = out.getPos();
+    parquet.format.FileMetaData parquetMetadata = new 
ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
+    writeFileMetaData(parquetMetadata, out);
+    if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - 
footerIndex));
+    BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
+    out.write(MAGIC);
+  }
+
+  /**
+   * writes a _metadata file
+   * @param configuration the configuration to use to get the FileSystem
+   * @param outputPath the directory to write the _metadata file to
+   * @param footers the list of footers to merge
+   * @throws java.io.IOException
+   */
+  public static void writeMetadataFile(Configuration configuration, Path 
outputPath, List<Footer> footers) throws IOException {
+    Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
+    FileSystem fs = outputPath.getFileSystem(configuration);
+    outputPath = outputPath.makeQualified(fs);
+    FSDataOutputStream metadata = fs.create(metaDataPath);
+    metadata.write(MAGIC);
+    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
+    serializeFooter(metadataFooter, metadata);
+    metadata.close();
+  }
+
+  private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) 
{
+    String rootPath = root.toString();
+    GlobalMetaData fileMetaData = null;
+    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+    for (Footer footer : footers) {
+      String path = footer.getFile().toString();
+      if (!path.startsWith(rootPath)) {
+        throw new ParquetEncodingException(path + " invalid: all the files 
must be contained in the root " + root);
+      }
+      path = path.substring(rootPath.length());
+      while (path.startsWith("/")) {
+        path = path.substring(1);
+      }
+      fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), 
fileMetaData);
+      for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
+        block.setPath(path);
+        blocks.add(block);
+      }
+    }
+    return new ParquetMetadata(fileMetaData.merge(), blocks);
+  }
+
+  /**
+   * @return the current position in the underlying file
+   * @throws java.io.IOException
+   */
+  public long getPos() throws IOException {
+    return out.getPos();
+  }
+
+  /**
+   * Will merge the metadata of all the footers together
+   * @param footers the list files footers to merge
+   * @return the global meta data for all the footers
+   */
+  static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
+    GlobalMetaData fileMetaData = null;
+    for (Footer footer : footers) {
+      ParquetMetadata currentMetadata = footer.getParquetMetadata();
+      fileMetaData = mergeInto(currentMetadata.getFileMetaData(), 
fileMetaData);
+    }
+    return fileMetaData;
+  }
+
+  /**
+   * Will return the result of merging toMerge into mergedMetadata
+   * @param toMerge the metadata toMerge
+   * @param mergedMetadata the reference metadata to merge into
+   * @return the result of the merge
+   */
+  static GlobalMetaData mergeInto(
+      FileMetaData toMerge,
+      GlobalMetaData mergedMetadata) {
+    MessageType schema = null;
+    Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
+    Set<String> createdBy = new HashSet<String>();
+    if (mergedMetadata != null) {
+      schema = mergedMetadata.getSchema();
+      newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
+      createdBy.addAll(mergedMetadata.getCreatedBy());
+    }
+    if ((schema == null && toMerge.getSchema() != null)
+        || (schema != null && !schema.equals(toMerge.getSchema()))) {
+      schema = mergeInto(toMerge.getSchema(), schema);
+    }
+    for (Entry<String, String> entry : 
toMerge.getKeyValueMetaData().entrySet()) {
+      Set<String> values = newKeyValues.get(entry.getKey());
+      if (values == null) {
+        values = new HashSet<String>();
+        newKeyValues.put(entry.getKey(), values);
+      }
+      values.add(entry.getValue());
+    }
+    createdBy.add(toMerge.getCreatedBy());
+    return new GlobalMetaData(
+        schema,
+        newKeyValues,
+        createdBy);
+  }
+
+  /**
+   * will return the result of merging toMerge into mergedSchema
+   * @param toMerge the schema to merge into mergedSchema
+   * @param mergedSchema the schema to append the fields to
+   * @return the resulting schema
+   */
+  static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
+    if (mergedSchema == null) {
+      return toMerge;
+    }
+    return mergedSchema.union(toMerge);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
new file mode 100644
index 0000000..9c167a0
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.api.ReadSupport.ReadContext;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.GlobalMetaData;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Read records from a Parquet file.
+ */
+public class ParquetReader<T> implements Closeable {
+
+  private ReadSupport<T> readSupport;
+  private UnboundRecordFilter filter;
+  private Configuration conf;
+  private ReadContext readContext;
+  private Iterator<Footer> footersIterator;
+  private InternalParquetRecordReader<T> reader;
+  private GlobalMetaData globalMetaData;
+
+  /**
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @throws java.io.IOException
+   */
+  public ParquetReader(Path file, ReadSupport<T> readSupport) throws 
IOException {
+    this(file, readSupport, null);
+  }
+
+  /**
+   * @param conf the configuration
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @throws java.io.IOException
+   */
+  public ParquetReader(Configuration conf, Path file, ReadSupport<T> 
readSupport) throws IOException {
+    this(conf, file, readSupport, null);
+  }
+
+  /**
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @param filter the filter to use to filter records
+   * @throws java.io.IOException
+   */
+  public ParquetReader(Path file, ReadSupport<T> readSupport, 
UnboundRecordFilter filter) throws IOException {
+    this(new Configuration(), file, readSupport, filter);
+  }
+
+  /**
+   * @param conf the configuration
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @param filter the filter to use to filter records
+   * @throws java.io.IOException
+   */
+  public ParquetReader(Configuration conf, Path file, ReadSupport<T> 
readSupport, UnboundRecordFilter filter) throws IOException {
+    this.readSupport = readSupport;
+    this.filter = filter;
+    this.conf = conf;
+
+    FileSystem fs = file.getFileSystem(conf);
+    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
+    List<Footer> footers = 
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+    this.footersIterator = footers.iterator();
+    globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
+
+    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+    for (Footer footer : footers) {
+      blocks.addAll(footer.getParquetMetadata().getBlocks());
+    }
+
+    MessageType schema = globalMetaData.getSchema();
+    Map<String, Set<String>> extraMetadata = 
globalMetaData.getKeyValueMetaData();
+    readContext = readSupport.init(new InitContext(conf, extraMetadata, 
schema));
+  }
+
+  /**
+   * @return the next record or null if finished
+   * @throws java.io.IOException
+   */
+  public T read() throws IOException {
+    try {
+      if (reader != null && reader.nextKeyValue()) {
+        return reader.getCurrentValue();
+      } else {
+        initReader();
+        return reader == null ? null : read();
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void initReader() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+    if (footersIterator.hasNext()) {
+      Footer footer = footersIterator.next();
+      reader = new InternalParquetRecordReader<T>(readSupport, filter);
+      reader.initialize(
+          readContext.getRequestedSchema(), globalMetaData.getSchema(), 
footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
+          readContext.getReadSupportMetadata(), footer.getFile(), 
footer.getParquetMetadata().getBlocks(), conf);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
new file mode 100644
index 0000000..7527437
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.column.ParquetProperties;
+import parquet.hadoop.api.WriteSupport;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class ParquetWriter<T> implements Closeable {
+
+  public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
+  public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
+  public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
+      CompressionCodecName.UNCOMPRESSED;
+  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+  public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
+  public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
+      ParquetProperties.WriterVersion.PARQUET_1_0;
+
+  private final InternalParquetRecordWriter<T> writer;
+
+  /**
+   * Create a new ParquetWriter.
+   * (with dictionary encoding enabled and validation off)
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a 
RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @throws java.io.IOException
+   * @see #ParquetWriter(org.apache.hadoop.fs.Path, 
parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, 
int, int, boolean, boolean)
+   */
+  public ParquetWriter(Path file, WriteSupport<T> writeSupport, 
CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws 
IOException {
+    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+        DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
+  }
+
+  /**
+   * Create a new ParquetWriter.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a 
RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold (both data and dictionary)
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @throws java.io.IOException
+   * @see #ParquetWriter(org.apache.hadoop.fs.Path, 
parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, 
int, int, int, boolean, boolean)
+   */
+  public ParquetWriter(
+      Path file,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      boolean enableDictionary,
+      boolean validating) throws IOException {
+    this(file, writeSupport, compressionCodecName, blockSize, pageSize, 
pageSize, enableDictionary, validating);
+  }
+
+  /**
+   * Create a new ParquetWriter.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a 
RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @param dictionaryPageSize the page size threshold for the dictionary pages
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @throws java.io.IOException
+   * @see #ParquetWriter(org.apache.hadoop.fs.Path, 
parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, 
int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
+   */
+  public ParquetWriter(
+      Path file,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating) throws IOException {
+    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+        dictionaryPageSize, enableDictionary, validating,
+        DEFAULT_WRITER_VERSION);
+  }
+
+  /**
+   * Create a new ParquetWriter.
+   *
+   * Directly instantiates a Hadoop {@link 
org.apache.hadoop.conf.Configuration} which reads
+   * configuration from the classpath.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a 
RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @param dictionaryPageSize the page size threshold for the dictionary pages
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @param writerVersion version of parquetWriter from {@link 
parquet.column.ParquetProperties.WriterVersion}
+   * @throws java.io.IOException
+   * @see #ParquetWriter(org.apache.hadoop.fs.Path, 
parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, 
int, int, int, boolean, boolean, 
parquet.column.ParquetProperties.WriterVersion, 
org.apache.hadoop.conf.Configuration)
+   */
+  public ParquetWriter(
+      Path file,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      ParquetProperties.WriterVersion writerVersion) throws IOException {
+    this(file, writeSupport, compressionCodecName, blockSize, pageSize, 
dictionaryPageSize, enableDictionary, validating, writerVersion, new 
Configuration());
+  }
+
+  /**
+   * Create a new ParquetWriter.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a 
RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @param dictionaryPageSize the page size threshold for the dictionary pages
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @param writerVersion version of parquetWriter from {@link 
parquet.column.ParquetProperties.WriterVersion}
+   * @param conf Hadoop configuration to use while accessing the filesystem
+   * @throws java.io.IOException
+   */
+  public ParquetWriter(
+      Path file,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      ParquetProperties.WriterVersion writerVersion,
+      Configuration conf) throws IOException {
+
+    WriteSupport.WriteContext writeContext = writeSupport.init(conf);
+    MessageType schema = writeContext.getSchema();
+
+    ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
+    fileWriter.start();
+
+    CodecFactory codecFactory = new CodecFactory(conf);
+    CodecFactory.BytesCompressor compressor =  
codecFactory.getCompressor(compressionCodecName, 0);
+    this.writer = new InternalParquetRecordWriter<T>(
+        fileWriter,
+        writeSupport,
+        schema,
+        writeContext.getExtraMetaData(),
+        blockSize,
+        pageSize,
+        compressor,
+        dictionaryPageSize,
+        enableDictionary,
+        validating,
+        writerVersion);
+  }
+
+  /**
+   * Create a new ParquetWriter.  The default block size is 50 MB.The default
+   * page size is 1 MB.  Default compression is no compression. Dictionary 
encoding is disabled.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a 
RecordConsumer
+   * @throws java.io.IOException
+   */
+  public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws 
IOException {
+    this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, 
DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+  }
+
+  public void write(T object) throws IOException {
+    try {
+      writer.write(object);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public long getEstimatedWrittenSize() throws IOException {
+    return this.writer.getEstimatedWrittenSize();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      writer.close();
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5e5c1af0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
index ad0a92a..517e00e 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
@@ -22,8 +22,8 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.junit.Test;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
 
 import java.util.ArrayList;
 import java.util.List;

Reply via email to