http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
deleted file mode 100644
index 40d1545..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * <p>
- * Provides read and write support for Avro files. Avro schemas are
- * converted to Tajo schemas according to the following mapping of Avro
- * and Tajo types:
- * </p>
- *
- * <table>
- *   <tr>
- *     <th>Avro type</th>
- *     <th>Tajo type</th>
- *   </tr>
- *   <tr>
- *     <td>NULL</td>
- *     <td>NULL_TYPE</td>
- *   </tr>
- *   <tr>
- *     <td>BOOLEAN</td>
- *     <td>BOOLEAN</td>
- *   </tr>
- *   <tr>
- *     <td>INT</td>
- *     <td>INT4</td>
- *   </tr>
- *   <tr>
- *     <td>LONG</td>
- *     <td>INT8</td>
- *   </tr>
- *   <tr>
- *     <td>FLOAT</td>
- *     <td>FLOAT4</td>
- *   </tr>
- *   <tr>
- *     <td>DOUBLE</td>
- *     <td>FLOAT8</td>
- *   </tr>
- *   <tr>
- *     <td>BYTES</td>
- *     <td>BLOB</td>
- *   </tr>
- *   <tr>
- *     <td>STRING</td>
- *     <td>TEXT</td>
- *   </tr>
- *   <tr>
- *     <td>FIXED</td>
- *     <td>BLOB</td>
- *   </tr>
- *   <tr>
- *     <td>RECORD</td>
- *     <td>Not currently supported</td>
- *   </tr>
- *   <tr>
- *     <td>ENUM</td>
- *     <td>Not currently supported.</td>
- *   </tr>
- *   <tr>
- *     <td>MAP</td>
- *     <td>Not currently supported.</td>
- *   </tr>
- *   <tr>
- *     <td>UNION</td>
- *     <td>Not currently supported.</td>
- *   </tr>
- * </table>
- */
-
-package org.apache.tajo.storage.avro;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
deleted file mode 100644
index baeda8c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.compress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DoNotPool;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A global compressor/decompressor pool used to save and reuse (possibly
- * native) compression/decompression codecs.
- */
-public final class CodecPool {
-  private static final Log LOG = LogFactory.getLog(CodecPool.class);
-
-  /**
-   * A global compressor pool used to save the expensive
-   * construction/destruction of (possibly native) decompression codecs.
-   */
-  private static final Map<Class<Compressor>, List<Compressor>> 
COMPRESSOR_POOL =
-      new HashMap<Class<Compressor>, List<Compressor>>();
-
-  /**
-   * A global decompressor pool used to save the expensive
-   * construction/destruction of (possibly native) decompression codecs.
-   */
-  private static final Map<Class<Decompressor>, List<Decompressor>> 
DECOMPRESSOR_POOL =
-      new HashMap<Class<Decompressor>, List<Decompressor>>();
-
-  private static <T> T borrow(Map<Class<T>, List<T>> pool,
-      Class<? extends T> codecClass) {
-    T codec = null;
-
-    // Check if an appropriate codec is available
-    synchronized (pool) {
-      if (pool.containsKey(codecClass)) {
-        List<T> codecList = pool.get(codecClass);
-
-        if (codecList != null) {
-          synchronized (codecList) {
-            if (!codecList.isEmpty()) {
-              codec = codecList.remove(codecList.size() - 1);
-            }
-          }
-        }
-      }
-    }
-
-    return codec;
-  }
-
-  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
-    if (codec != null) {
-      Class<T> codecClass = (Class<T>) codec.getClass();
-      synchronized (pool) {
-        if (!pool.containsKey(codecClass)) {
-          pool.put(codecClass, new ArrayList<T>());
-        }
-
-        List<T> codecList = pool.get(codecClass);
-        synchronized (codecList) {
-          codecList.add(codec);
-        }
-      }
-    }
-  }
-
-  /**
-   * Get a {@link Compressor} for the given {@link CompressionCodec} from the
-   * pool or a new one.
-   *
-   * @param codec
-   *          the <code>CompressionCodec</code> for which to get the
-   *          <code>Compressor</code>
-   * @param conf the <code>Configuration</code> object which contains confs 
for creating or reinit the compressor
-   * @return <code>Compressor</code> for the given 
<code>CompressionCodec</code>
-   *         from the pool or a new one
-   */
-  public static Compressor getCompressor(CompressionCodec codec, Configuration 
conf) {
-    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
-    if (compressor == null) {
-      compressor = codec.createCompressor();
-      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
-    } else {
-      compressor.reinit(conf);
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Got recycled compressor");
-      }
-    }
-    return compressor;
-  }
-
-  public static Compressor getCompressor(CompressionCodec codec) {
-    return getCompressor(codec, null);
-  }
-
-  /**
-   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
-   * pool or a new one.
-   *
-   * @param codec
-   *          the <code>CompressionCodec</code> for which to get the
-   *          <code>Decompressor</code>
-   * @return <code>Decompressor</code> for the given
-   *         <code>CompressionCodec</code> the pool or a new one
-   */
-  public static Decompressor getDecompressor(CompressionCodec codec) {
-    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
-        .getDecompressorType());
-    if (decompressor == null) {
-      decompressor = codec.createDecompressor();
-      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
-    } else {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Got recycled decompressor");
-      }
-    }
-    return decompressor;
-  }
-
-  /**
-   * Return the {@link Compressor} to the pool.
-   *
-   * @param compressor
-   *          the <code>Compressor</code> to be returned to the pool
-   */
-  public static void returnCompressor(Compressor compressor) {
-    if (compressor == null) {
-      return;
-    }
-    // if the compressor can't be reused, don't pool it.
-    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
-      return;
-    }
-    compressor.reset();
-    payback(COMPRESSOR_POOL, compressor);
-  }
-
-  /**
-   * Return the {@link Decompressor} to the pool.
-   *
-   * @param decompressor
-   *          the <code>Decompressor</code> to be returned to the pool
-   */
-  public static void returnDecompressor(Decompressor decompressor) {
-    if (decompressor == null) {
-      return;
-    }
-    // if the decompressor can't be reused, don't pool it.
-    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
-      return;
-    }
-    decompressor.reset();
-    payback(DECOMPRESSOR_POOL, decompressor);
-  }
-
-  private CodecPool() {
-    // prevent instantiation
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
deleted file mode 100644
index bb035a8..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage.exception;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-
-public class AlreadyExistsStorageException extends IOException {
-  private static final long serialVersionUID = 965518916144019032L;
-
-
-  public AlreadyExistsStorageException(String path) {
-    super("Error: "+path+" alreay exists");    
-  }
-  
-  public AlreadyExistsStorageException(Path path) {
-    this(path.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
deleted file mode 100644
index a67d1f7..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.exception;
-
-public class UnknownCodecException extends Exception {
-
-  private static final long serialVersionUID = 4287230843540404529L;
-
-  public UnknownCodecException() {
-
-  }
-
-  public UnknownCodecException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
deleted file mode 100644
index d18b5a0..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.exception;
-
-public class UnknownDataTypeException extends Exception {
-
-  private static final long serialVersionUID = -2630390595968966164L;
-
-  public UnknownDataTypeException() {
-
-  }
-
-  public UnknownDataTypeException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
deleted file mode 100644
index 8b197d6..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage.exception;
-
-public class UnsupportedFileTypeException extends RuntimeException {
-       private static final long serialVersionUID = -8160289695849000342L;
-
-       public UnsupportedFileTypeException() {
-       }
-
-       /**
-        * @param message
-        */
-       public UnsupportedFileTypeException(String message) {
-               super(message);
-       }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
deleted file mode 100644
index 6fe6841..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FileFragmentProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class FileFragment implements Fragment, Comparable<FileFragment>, 
Cloneable {
-  @Expose private String tableName; // required
-  @Expose private Path uri; // required
-  @Expose private Long startOffset; // required
-  @Expose private Long length; // required
-
-  private String[] hosts; // Datanode hostnames
-  @Expose private int[] diskIds;
-
-  public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
-    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
-    builder.mergeFrom(raw);
-    builder.build();
-    init(builder.build());
-  }
-
-  public FileFragment(String tableName, Path uri, BlockLocation blockLocation)
-      throws IOException {
-    this.set(tableName, uri, blockLocation.getOffset(), 
blockLocation.getLength(), blockLocation.getHosts(), null);
-  }
-
-  public FileFragment(String tableName, Path uri, long start, long length, 
String[] hosts, int[] diskIds) {
-    this.set(tableName, uri, start, length, hosts, diskIds);
-  }
-  // Non splittable
-  public FileFragment(String tableName, Path uri, long start, long length, 
String[] hosts) {
-    this.set(tableName, uri, start, length, hosts, null);
-  }
-
-  public FileFragment(String fragmentId, Path path, long start, long length) {
-    this.set(fragmentId, path, start, length, null, null);
-  }
-
-  public FileFragment(FileFragmentProto proto) {
-    init(proto);
-  }
-
-  private void init(FileFragmentProto proto) {
-    int[] diskIds = new int[proto.getDiskIdsList().size()];
-    int i = 0;
-    for(Integer eachValue: proto.getDiskIdsList()) {
-      diskIds[i++] = eachValue;
-    }
-    this.set(proto.getId(), new Path(proto.getPath()),
-        proto.getStartOffset(), proto.getLength(),
-        proto.getHostsList().toArray(new String[]{}),
-        diskIds);
-  }
-
-  private void set(String tableName, Path path, long start,
-      long length, String[] hosts, int[] diskIds) {
-    this.tableName = tableName;
-    this.uri = path;
-    this.startOffset = start;
-    this.length = length;
-    this.hosts = hosts;
-    this.diskIds = diskIds;
-  }
-
-
-  /**
-   * Get the list of hosts (hostname) hosting this block
-   */
-  public String[] getHosts() {
-    if (hosts == null) {
-      this.hosts = new String[0];
-    }
-    return hosts;
-  }
-
-  /**
-   * Get the list of Disk Ids
-   * Unknown disk is -1. Others 0 ~ N
-   */
-  public int[] getDiskIds() {
-    if (diskIds == null) {
-      this.diskIds = new int[getHosts().length];
-      Arrays.fill(this.diskIds, -1);
-    }
-    return diskIds;
-  }
-
-  public void setDiskIds(int[] diskIds){
-    this.diskIds = diskIds;
-  }
-
-  public String getTableName() {
-    return this.tableName;
-  }
-
-  public Path getPath() {
-    return this.uri;
-  }
-
-  public void setPath(Path path) {
-    this.uri = path;
-  }
-
-  public Long getStartKey() {
-    return this.startOffset;
-  }
-
-  public Long getEndKey() {
-    return this.length;
-  }
-
-  /**
-   * 
-   * The offset range of tablets <b>MUST NOT</b> be overlapped.
-   * 
-   * @param t
-   * @return If the table paths are not same, return -1.
-   */
-  @Override
-  public int compareTo(FileFragment t) {
-    if (getPath().equals(t.getPath())) {
-      long diff = this.getStartKey() - t.getStartKey();
-      if (diff < 0) {
-        return -1;
-      } else if (diff > 0) {
-        return 1;
-      } else {
-        return 0;
-      }
-    } else {
-      return -1;
-    }
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof FileFragment) {
-      FileFragment t = (FileFragment) o;
-      if (getPath().equals(t.getPath())
-          && TUtil.checkEquals(t.getStartKey(), this.getStartKey())
-          && TUtil.checkEquals(t.getEndKey(), this.getEndKey())) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(tableName, uri, startOffset, length);
-  }
-  
-  public Object clone() throws CloneNotSupportedException {
-    FileFragment frag = (FileFragment) super.clone();
-    frag.tableName = tableName;
-    frag.uri = uri;
-    frag.diskIds = diskIds;
-    frag.hosts = hosts;
-
-    return frag;
-  }
-
-  @Override
-  public String toString() {
-    return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
-               +getPath() + "\", \"start\": " + this.getStartKey() + 
",\"length\": "
-        + getEndKey() + "}" ;
-  }
-
-  public FragmentProto getProto() {
-    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
-    builder.setId(this.tableName);
-    builder.setStartOffset(this.startOffset);
-    builder.setLength(this.length);
-    builder.setPath(this.uri.toString());
-    if(diskIds != null) {
-      List<Integer> idList = new ArrayList<Integer>();
-      for(int eachId: diskIds) {
-        idList.add(eachId);
-      }
-      builder.addAllDiskIds(idList);
-    }
-
-    if(hosts != null) {
-      builder.addAllHosts(TUtil.newList(hosts));
-    }
-
-    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
-    fragmentBuilder.setId(this.tableName);
-    fragmentBuilder.setContents(builder.buildPartial().toByteString());
-    return fragmentBuilder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
deleted file mode 100644
index 3f9c160..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import org.apache.tajo.common.ProtoObject;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public interface Fragment extends ProtoObject<FragmentProto> {
-
-  public abstract String getTableName();
-
-  @Override
-  public abstract FragmentProto getProto();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
deleted file mode 100644
index 0315a8d..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.annotation.ThreadSafe;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-
-@ThreadSafe
-public class FragmentConvertor {
-  /**
-   * Cache of fragment classes
-   */
-  protected static final Map<String, Class<? extends Fragment>> 
CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
-  /**
-   * Cache of constructors for each class.
-   */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 
Maps.newConcurrentMap();
-  /**
-   * default parameter for all constructors
-   */
-  private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class 
};
-
-  public static Class<? extends Fragment> getFragmentClass(Configuration conf, 
StoreType storeType)
-      throws IOException {
-    String handlerName = storeType.name().toLowerCase();
-    Class<? extends Fragment> fragmentClass = 
CACHED_FRAGMENT_CLASSES.get(handlerName);
-    if (fragmentClass == null) {
-      fragmentClass = conf.getClass(
-          String.format("tajo.storage.fragment.%s.class", 
storeType.name().toLowerCase()), null, Fragment.class);
-      CACHED_FRAGMENT_CLASSES.put(handlerName, fragmentClass);
-    }
-
-    if (fragmentClass == null) {
-      throw new IOException("No such a fragment for " + storeType.name());
-    }
-
-    return fragmentClass;
-  }
-
-  public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto 
fragment) {
-    T result;
-    try {
-      Constructor<T> constructor = (Constructor<T>) 
CONSTRUCTOR_CACHE.get(clazz);
-      if (constructor == null) {
-        constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
-        constructor.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(clazz, constructor);
-      }
-      result = constructor.newInstance(new Object[]{fragment.getContents()});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-
-  public static <T extends Fragment> T convert(Configuration conf, StoreType 
storeType, FragmentProto fragment)
-      throws IOException {
-    Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, storeType);
-    if (fragmentClass == null) {
-      throw new IOException("No such a fragment class for " + 
storeType.name());
-    }
-    return convert(fragmentClass, fragment);
-  }
-
-  public static <T extends Fragment> List<T> convert(Class<T> clazz, 
FragmentProto...fragments)
-      throws IOException {
-    List<T> list = Lists.newArrayList();
-    if (fragments == null) {
-      return list;
-    }
-    for (FragmentProto proto : fragments) {
-      list.add(convert(clazz, proto));
-    }
-    return list;
-  }
-
-  public static <T extends Fragment> List<T> convert(Configuration conf, 
StoreType storeType,
-                                                           
FragmentProto...fragments) throws IOException {
-    List<T> list = Lists.newArrayList();
-    if (fragments == null) {
-      return list;
-    }
-    for (FragmentProto proto : fragments) {
-      list.add((T) convert(conf, storeType, proto));
-    }
-    return list;
-  }
-
-  public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) 
{
-    List<FragmentProto> list = Lists.newArrayList();
-    if (fragments == null) {
-      return list;
-    }
-    for (Fragment fragment : fragments) {
-      list.add(fragment.getProto());
-    }
-    return list;
-  }
-
-  public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
-    List<FragmentProto> list = toFragmentProtoList(fragments);
-    return list.toArray(new FragmentProto[list.size()]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
deleted file mode 100644
index ccba3be..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.BaseTupleComparator;
-import org.apache.tajo.storage.TupleComparator;
-
-import java.io.IOException;
-
-public interface IndexMethod {
-  IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
-      TupleComparator comparator) throws IOException;
-  IndexReader getIndexReader(final Path fileName, Schema keySchema,
-      TupleComparator comparator) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
deleted file mode 100644
index 7baf7aa..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index;
-
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public interface IndexReader {
-  
-  /**
-   * Find the offset corresponding to key which is equal to a given key.
-   * 
-   * @param key
-   * @return
-   * @throws IOException 
-   */
-  public long find(Tuple key) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
deleted file mode 100644
index 04738f8..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage.index;
-
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public abstract class IndexWriter {
-  
-  public abstract void write(Tuple key, long offset) throws IOException;
-  
-  public abstract void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
deleted file mode 100644
index 688bbc7..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage.index;
-
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public interface OrderIndexReader extends IndexReader {
-  /**
-   * Find the offset corresponding to key which is equal to or greater than 
-   * a given key.
-   * 
-   * @param key to find
-   * @return
-   * @throws IOException 
-   */
-  public long find(Tuple key, boolean nextKey) throws IOException;
-  
-  /**
-   * Return the next offset from the latest find or next offset
-   * @return
-   * @throws IOException
-   */
-  public long next() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
deleted file mode 100644
index f093f9d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.index.bst;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.index.IndexMethod;
-import org.apache.tajo.storage.index.IndexWriter;
-import org.apache.tajo.storage.index.OrderIndexReader;
-
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.TreeMap;
-
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-/**
- * This is two-level binary search tree index. This is one of the value-list 
- * index structure. Thus, it is inefficient in the case where 
- * the many of the values are same. Also, the BST shows the fast performance 
- * when the selectivity of rows to be retrieved is less than 5%.
- * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe.
- */
-public class BSTIndex implements IndexMethod {
-  private static final Log LOG = LogFactory.getLog(BSTIndex.class);
-
-  public static final int ONE_LEVEL_INDEX = 1;
-  public static final int TWO_LEVEL_INDEX = 2;
-
-  private final Configuration conf;
-
-  public BSTIndex(final Configuration conf) {
-    this.conf = conf;
-  }
-  
-  @Override
-  public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema 
keySchema,
-      TupleComparator comparator) throws IOException {
-    return new BSTIndexWriter(fileName, level, keySchema, comparator);
-  }
-
-  @Override
-  public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, 
TupleComparator comparator) throws IOException {
-    return new BSTIndexReader(fileName, keySchema, comparator);
-  }
-
-  public BSTIndexReader getIndexReader(Path fileName) throws IOException {
-    return new BSTIndexReader(fileName);
-  }
-
-  public class BSTIndexWriter extends IndexWriter implements Closeable {
-    private FSDataOutputStream out;
-    private FileSystem fs;
-    private int level;
-    private int loadNum = 4096;
-    private Path fileName;
-
-    private final Schema keySchema;
-    private final TupleComparator compartor;
-    private final KeyOffsetCollector collector;
-    private KeyOffsetCollector rootCollector;
-
-    private Tuple firstKey;
-    private Tuple lastKey;
-
-    private RowStoreEncoder rowStoreEncoder;
-
-    // private Tuple lastestKey = null;
-
-    /**
-     * constructor
-     *
-     * @param level
-     *          : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX
-     * @throws IOException
-     */
-    public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
-        TupleComparator comparator) throws IOException {
-      this.fileName = fileName;
-      this.level = level;
-      this.keySchema = keySchema;
-      this.compartor = comparator;
-      this.collector = new KeyOffsetCollector(comparator);
-      this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema);
-    }
-
-   public void setLoadNum(int loadNum) {
-      this.loadNum = loadNum;
-    }
-
-    public void open() throws IOException {
-      fs = fileName.getFileSystem(conf);
-      if (fs.exists(fileName)) {
-        throw new IOException("ERROR: index file (" + fileName + " already 
exists");
-      }
-      out = fs.create(fileName);
-    }
-
-    @Override
-    public void write(Tuple key, long offset) throws IOException {
-      if (firstKey == null || compartor.compare(key, firstKey) < 0) {
-        firstKey = key;
-      }
-      if (lastKey == null || compartor.compare(lastKey, key) < 0) {
-        lastKey = key;
-      }
-
-      collector.put(key, offset);
-    }
-
-    public TupleComparator getComparator() {
-      return this.compartor;
-    }
-
-    public void flush() throws IOException {
-      out.flush();
-    }
-
-    public void writeHeader(int entryNum) throws IOException {
-      // schema
-      byte [] schemaBytes = keySchema.getProto().toByteArray();
-      out.writeInt(schemaBytes.length);
-      out.write(schemaBytes);
-
-      // comparator
-      byte [] comparatorBytes = compartor.getProto().toByteArray();
-      out.writeInt(comparatorBytes.length);
-      out.write(comparatorBytes);
-
-      // level
-      out.writeInt(this.level);
-      // entry
-      out.writeInt(entryNum);
-      if (entryNum > 0) {
-        byte [] minBytes = rowStoreEncoder.toBytes(firstKey);
-        out.writeInt(minBytes.length);
-        out.write(minBytes);
-        byte [] maxBytes = rowStoreEncoder.toBytes(lastKey);
-        out.writeInt(maxBytes.length);
-        out.write(maxBytes);
-      }
-      out.flush();
-    }
-
-    public void close() throws IOException {
-      /* two level initialize */
-      if (this.level == TWO_LEVEL_INDEX) {
-        rootCollector = new KeyOffsetCollector(this.compartor);
-      }
-
-      /* data writing phase */
-      TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
-      Set<Tuple> keySet = keyOffsetMap.keySet();
-
-      int entryNum = keySet.size();
-      writeHeader(entryNum);
-      
-      int loadCount = this.loadNum - 1;
-      for (Tuple key : keySet) {
-
-        if (this.level == TWO_LEVEL_INDEX) {
-          loadCount++;
-          if (loadCount == this.loadNum) {
-            rootCollector.put(key, out.getPos());
-            loadCount = 0;
-          }
-        }
-        /* key writing */
-        byte[] buf = rowStoreEncoder.toBytes(key);
-        out.writeInt(buf.length);
-        out.write(buf);
-        
-        /**/
-        LinkedList<Long> offsetList = keyOffsetMap.get(key);
-        /* offset num writing */
-        int offsetSize = offsetList.size();
-        out.writeInt(offsetSize);
-        /* offset writing */
-        for (Long offset : offsetList) {
-          out.writeLong(offset);
-        }
-      }
-
-      out.flush();
-      out.close();
-      keySet.clear();
-      collector.clear();
-
-      FSDataOutputStream rootOut = null;
-      /* root index creating phase */
-      if (this.level == TWO_LEVEL_INDEX) {
-        TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
-        keySet = rootMap.keySet();
-
-        rootOut = fs.create(new Path(fileName + ".root"));
-        rootOut.writeInt(this.loadNum);
-        rootOut.writeInt(keySet.size());
-
-        /* root key writing */
-        for (Tuple key : keySet) {
-          byte[] buf = rowStoreEncoder.toBytes(key);
-          rootOut.writeInt(buf.length);
-          rootOut.write(buf);
-
-          LinkedList<Long> offsetList = rootMap.get(key);
-          if (offsetList.size() > 1 || offsetList.size() == 0) {
-            throw new IOException("Why root index doen't have one offset?");
-          }
-          rootOut.writeLong(offsetList.getFirst());
-
-        }
-        rootOut.flush();
-        rootOut.close();
-
-        keySet.clear();
-        rootCollector.clear();
-      }
-    }
-
-    private class KeyOffsetCollector {
-      private TreeMap<Tuple, LinkedList<Long>> map;
-
-      public KeyOffsetCollector(TupleComparator comparator) {
-        map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
-      }
-
-      public void put(Tuple key, long offset) {
-        if (map.containsKey(key)) {
-          map.get(key).add(offset);
-        } else {
-          LinkedList<Long> list = new LinkedList<Long>();
-          list.add(offset);
-          map.put(key, list);
-        }
-      }
-
-      public TreeMap<Tuple, LinkedList<Long>> getMap() {
-        return this.map;
-      }
-
-      public void clear() {
-        this.map.clear();
-      }
-    }
-  }
-
-  /**
-   * BSTIndexReader is thread-safe.
-   */
-  public class BSTIndexReader implements OrderIndexReader , Closeable{
-    private Path fileName;
-    private Schema keySchema;
-    private TupleComparator comparator;
-
-    private FileSystem fs;
-    private FSDataInputStream indexIn;
-    private FSDataInputStream subIn;
-
-    private int level;
-    private int entryNum;
-    private int loadNum = -1;
-    private Tuple firstKey;
-    private Tuple lastKey;
-
-    // the cursors of BST
-    private int rootCursor;
-    private int keyCursor;
-    private int offsetCursor;
-
-    // mutex
-    private final Object mutex = new Object();
-
-    private RowStoreDecoder rowStoreDecoder;
-
-    /**
-     *
-     * @param fileName
-     * @param keySchema
-     * @param comparator
-     * @throws IOException
-     */
-    public BSTIndexReader(final Path fileName, Schema keySchema, 
TupleComparator comparator) throws IOException {
-      this.fileName = fileName;
-      this.keySchema = keySchema;
-      this.comparator = comparator;
-      this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
-    }
-
-    public BSTIndexReader(final Path fileName) throws IOException {
-      this.fileName = fileName;
-    }
-
-    public Schema getKeySchema() {
-      return this.keySchema;
-    }
-
-    public TupleComparator getComparator() {
-      return this.comparator;
-    }
-
-    private void readHeader() throws IOException {
-      // schema
-      int schemaByteSize = indexIn.readInt();
-      byte [] schemaBytes = new byte[schemaByteSize];
-      StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize);
-
-      SchemaProto.Builder builder = SchemaProto.newBuilder();
-      builder.mergeFrom(schemaBytes);
-      SchemaProto proto = builder.build();
-      this.keySchema = new Schema(proto);
-      this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
-
-      // comparator
-      int compByteSize = indexIn.readInt();
-      byte [] compBytes = new byte[compByteSize];
-      StorageUtil.readFully(indexIn, compBytes, 0, compByteSize);
-
-      TupleComparatorProto.Builder compProto = 
TupleComparatorProto.newBuilder();
-      compProto.mergeFrom(compBytes);
-      this.comparator = new BaseTupleComparator(compProto.build());
-
-      // level
-      this.level = indexIn.readInt();
-      // entry
-      this.entryNum = indexIn.readInt();
-      if (entryNum > 0) { // if there is no any entry, do not read 
firstKey/lastKey values
-        byte [] minBytes = new byte[indexIn.readInt()];
-        StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length);
-        this.firstKey = rowStoreDecoder.toTuple(minBytes);
-
-        byte [] maxBytes = new byte[indexIn.readInt()];
-        StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length);
-        this.lastKey = rowStoreDecoder.toTuple(maxBytes);
-      }
-    }
-
-    public void open()
-        throws IOException {
-      /* init the index file */
-      fs = fileName.getFileSystem(conf);
-      if (!fs.exists(fileName)) {
-        throw new FileNotFoundException("ERROR: does not exist " + 
fileName.toString());
-      }
-
-      indexIn = fs.open(this.fileName);
-      readHeader();
-      fillData();
-    }
-
-    private void fillData() throws IOException {
-      /* load on memory */
-      if (this.level == TWO_LEVEL_INDEX) {
-
-        Path rootPath = new Path(this.fileName + ".root");
-        if (!fs.exists(rootPath)) {
-          throw new FileNotFoundException("root index did not created");
-        }
-
-        subIn = indexIn;
-        indexIn = fs.open(rootPath);
-        /* root index header reading : type => loadNum => indexSize */
-        this.loadNum = indexIn.readInt();
-        this.entryNum = indexIn.readInt();
-        /**/
-        fillRootIndex(entryNum, indexIn);
-
-      } else {
-        fillLeafIndex(entryNum, indexIn, -1);
-      }
-    }
-
-    /**
-     *
-     * @return
-     * @throws IOException
-     */
-    public long find(Tuple key) throws IOException {
-      return find(key, false);
-    }
-
-    @Override
-    public long find(Tuple key, boolean nextKey) throws IOException {
-      synchronized (mutex) {
-        int pos = -1;
-        if (this.level == ONE_LEVEL_INDEX) {
-            pos = oneLevBS(key);
-        } else if (this.level == TWO_LEVEL_INDEX) {
-            pos = twoLevBS(key, this.loadNum + 1);
-        } else {
-          throw new IOException("More than TWL_LEVEL_INDEX is not supported.");
-        }
-
-        if (nextKey) {
-          if (pos + 1 >= this.offsetSubIndex.length) {
-            return -1;
-          }
-          keyCursor = pos + 1;
-          offsetCursor = 0;
-        } else {
-          if (correctable) {
-            keyCursor = pos;
-            offsetCursor = 0;
-          } else {
-            return -1;
-          }
-        }
-
-        return this.offsetSubIndex[keyCursor][offsetCursor];
-      }
-    }
-
-    public long next() throws IOException {
-      synchronized (mutex) {
-        if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) {
-          offsetCursor++;
-        } else {
-          if (offsetSubIndex.length - 1 > keyCursor) {
-            keyCursor++;
-            offsetCursor = 0;
-          } else {
-            if (offsetIndex.length -1 > rootCursor) {
-              rootCursor++;
-              fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]);
-              keyCursor = 1;
-              offsetCursor = 0;
-            } else {
-              return -1;
-            }
-          }
-        }
-
-        return this.offsetSubIndex[keyCursor][offsetCursor];
-      }
-    }
-    
-    public boolean isCurInMemory() {
-      return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor);
-    }
-
-    private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos)
-        throws IOException {
-      int counter = 0;
-      try {
-        if (pos != -1) {
-          in.seek(pos);
-        }
-        this.dataSubIndex = new Tuple[entryNum];
-        this.offsetSubIndex = new long[entryNum][];
-
-        byte[] buf;
-        for (int i = 0; i < entryNum; i++) {
-          counter++;
-          buf = new byte[in.readInt()];
-          StorageUtil.readFully(in, buf, 0, buf.length);
-          dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
-
-          int offsetNum = in.readInt();
-          this.offsetSubIndex[i] = new long[offsetNum];
-          for (int j = 0; j < offsetNum; j++) {
-            this.offsetSubIndex[i][j] = in.readLong();
-          }
-
-        }
-
-      } catch (IOException e) {
-        counter--;
-        if (pos != -1) {
-          in.seek(pos);
-        }
-        this.dataSubIndex = new Tuple[counter];
-        this.offsetSubIndex = new long[counter][];
-
-        byte[] buf;
-        for (int i = 0; i < counter; i++) {
-          buf = new byte[in.readInt()];
-          StorageUtil.readFully(in, buf, 0, buf.length);
-          dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
-
-          int offsetNum = in.readInt();
-          this.offsetSubIndex[i] = new long[offsetNum];
-          for (int j = 0; j < offsetNum; j++) {
-            this.offsetSubIndex[i][j] = in.readLong();
-          }
-
-        }
-      }
-    }
-
-    public Tuple getFirstKey() {
-      return this.firstKey;
-    }
-
-    public Tuple getLastKey() {
-      return this.lastKey;
-    }
-
-    private void fillRootIndex(int entryNum, FSDataInputStream in)
-        throws IOException {
-      this.dataIndex = new Tuple[entryNum];
-      this.offsetIndex = new long[entryNum];
-      Tuple keyTuple;
-      byte[] buf;
-      for (int i = 0; i < entryNum; i++) {
-        buf = new byte[in.readInt()];
-        StorageUtil.readFully(in, buf, 0, buf.length);
-        keyTuple = rowStoreDecoder.toTuple(buf);
-        dataIndex[i] = keyTuple;
-        this.offsetIndex[i] = in.readLong();
-      }
-    }
-
-    /* memory index, only one is used. */
-    private Tuple[] dataIndex = null;
-    private Tuple[] dataSubIndex = null;
-
-    /* offset index */
-    private long[] offsetIndex = null;
-    private long[][] offsetSubIndex = null;
-
-    private boolean correctable = true;
-
-    private int oneLevBS(Tuple key) throws IOException {
-      correctable = true;
-      int pos = binarySearch(this.dataSubIndex, key, 0, 
this.dataSubIndex.length);
-      return pos;
-    }
-
-    private int twoLevBS(Tuple key, int loadNum) throws IOException {
-      int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length);
-      if(pos > 0) {
-        rootCursor = pos;
-      } else {
-        rootCursor = 0;
-      }
-      fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]);
-      pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
-       
-      return pos;
-    }
-
-    private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) 
{
-      int offset = -1;
-      int start = startPos;
-      int end = endPos;
-
-      //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
-      int centerPos = (start + end) >>> 1;
-      while (true) {
-        if (comparator.compare(arr[centerPos], key) > 0) {
-          if (centerPos == 0) {
-            correctable = false;
-            break;
-          } else if (comparator.compare(arr[centerPos - 1], key) < 0) {
-            correctable = false;
-            offset = centerPos - 1;
-            break;
-          } else {
-            end = centerPos;
-            centerPos = (start + end) / 2;
-          }
-        } else if (comparator.compare(arr[centerPos], key) < 0) {
-          if (centerPos == arr.length - 1) {
-            correctable = false;
-            offset = centerPos;
-            break;
-          } else if (comparator.compare(arr[centerPos + 1], key) > 0) {
-            correctable = false;
-            offset = centerPos;
-            break;
-          } else {
-            start = centerPos + 1;
-            centerPos = (start + end) / 2;
-          }
-        } else {
-          correctable = true;
-          offset = centerPos;
-          break;
-        }
-      }
-      return offset;
-    }
-
-    @Override
-    public void close() throws IOException {
-      this.indexIn.close();
-      this.subIn.close();
-    }
-
-    @Override
-    public String toString() {
-      return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
deleted file mode 100644
index dfe36f6..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.json;
-
-
-import io.netty.buffer.ByteBuf;
-import net.minidev.json.JSONArray;
-import net.minidev.json.JSONObject;
-import net.minidev.json.parser.JSONParser;
-import net.minidev.json.parser.ParseException;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.common.exception.NotImplementedException;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.text.TextLineDeserializer;
-import org.apache.tajo.storage.text.TextLineParsingError;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-public class JsonLineDeserializer extends TextLineDeserializer {
-  private JSONParser parser;
-  private Type [] types;
-  private String [] columnNames;
-
-  public JsonLineDeserializer(Schema schema, TableMeta meta, int[] 
targetColumnIndexes) {
-    super(schema, meta, targetColumnIndexes);
-  }
-
-  @Override
-  public void init() {
-    types = SchemaUtil.toTypes(schema);
-    columnNames = SchemaUtil.toSimpleNames(schema);
-
-    parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE);
-  }
-
-  @Override
-  public void deserialize(ByteBuf buf, Tuple output) throws IOException, 
TextLineParsingError {
-    byte [] line = new byte[buf.readableBytes()];
-    buf.readBytes(line);
-
-    try {
-      JSONObject object = (JSONObject) parser.parse(line);
-
-      for (int i = 0; i < targetColumnIndexes.length; i++) {
-        int actualIdx = targetColumnIndexes[i];
-        String fieldName = columnNames[actualIdx];
-
-        if (!object.containsKey(fieldName)) {
-          output.put(actualIdx, NullDatum.get());
-          continue;
-        }
-
-        switch (types[actualIdx]) {
-        case BOOLEAN:
-          String boolStr = object.getAsString(fieldName);
-          if (boolStr != null) {
-            output.put(actualIdx, 
DatumFactory.createBool(boolStr.equals("true")));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case CHAR:
-          String charStr = object.getAsString(fieldName);
-          if (charStr != null) {
-            output.put(actualIdx, DatumFactory.createChar(charStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case INT1:
-        case INT2:
-          Number int2Num = object.getAsNumber(fieldName);
-          if (int2Num != null) {
-            output.put(actualIdx, 
DatumFactory.createInt2(int2Num.shortValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case INT4:
-          Number int4Num = object.getAsNumber(fieldName);
-          if (int4Num != null) {
-            output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case INT8:
-          Number int8Num = object.getAsNumber(fieldName);
-          if (int8Num != null) {
-            output.put(actualIdx, 
DatumFactory.createInt8(int8Num.longValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case FLOAT4:
-          Number float4Num = object.getAsNumber(fieldName);
-          if (float4Num != null) {
-            output.put(actualIdx, 
DatumFactory.createFloat4(float4Num.floatValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case FLOAT8:
-          Number float8Num = object.getAsNumber(fieldName);
-          if (float8Num != null) {
-            output.put(actualIdx, 
DatumFactory.createFloat8(float8Num.doubleValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case TEXT:
-          String textStr = object.getAsString(fieldName);
-          if (textStr != null) {
-            output.put(actualIdx, DatumFactory.createText(textStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case TIMESTAMP:
-          String timestampStr = object.getAsString(fieldName);
-          if (timestampStr != null) {
-            output.put(actualIdx, DatumFactory.createTimestamp(timestampStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case TIME:
-          String timeStr = object.getAsString(fieldName);
-          if (timeStr != null) {
-            output.put(actualIdx, DatumFactory.createTime(timeStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case DATE:
-          String dateStr = object.getAsString(fieldName);
-          if (dateStr != null) {
-            output.put(actualIdx, DatumFactory.createDate(dateStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case BIT:
-        case BINARY:
-        case VARBINARY:
-        case BLOB: {
-          Object jsonObject = object.get(fieldName);
-
-          if (jsonObject == null) {
-            output.put(actualIdx, NullDatum.get());
-            break;
-          }
-          if (jsonObject instanceof String) {
-            output.put(actualIdx, DatumFactory.createBlob((String) 
jsonObject));
-          } else if (jsonObject instanceof JSONArray) {
-            JSONArray jsonArray = (JSONArray) jsonObject;
-            byte[] bytes = new byte[jsonArray.size()];
-            Iterator<Object> it = jsonArray.iterator();
-            int arrayIdx = 0;
-            while (it.hasNext()) {
-              bytes[arrayIdx++] = ((Long) it.next()).byteValue();
-            }
-            if (bytes.length > 0) {
-              output.put(actualIdx, DatumFactory.createBlob(bytes));
-            } else {
-              output.put(actualIdx, NullDatum.get());
-            }
-            break;
-          } else {
-            throw new IOException("Unknown json object: " + 
object.getClass().getSimpleName());
-          }
-          break;
-        }
-        case INET4:
-          String inetStr = object.getAsString(fieldName);
-          if (inetStr != null) {
-            output.put(actualIdx, DatumFactory.createInet4(inetStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-
-        case NULL_TYPE:
-          output.put(actualIdx, NullDatum.get());
-          break;
-
-        default:
-          throw new NotImplementedException(types[actualIdx].name() + " is not 
supported.");
-        }
-      }
-    } catch (ParseException pe) {
-      throw new TextLineParsingError(new String(line, 
TextDatum.DEFAULT_CHARSET), pe);
-    } catch (Throwable e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public void release() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
deleted file mode 100644
index 6db2c29..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.json;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.text.TextLineDeserializer;
-import org.apache.tajo.storage.text.TextLineSerDe;
-import org.apache.tajo.storage.text.TextLineSerializer;
-
-public class JsonLineSerDe extends TextLineSerDe {
-  @Override
-  public TextLineDeserializer createDeserializer(Schema schema, TableMeta 
meta, int[] targetColumnIndexes) {
-    return new JsonLineDeserializer(schema, meta, targetColumnIndexes);
-  }
-
-  @Override
-  public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
-    return new JsonLineSerializer(schema, meta);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
deleted file mode 100644
index c7007d8..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.json;
-
-
-import net.minidev.json.JSONObject;
-import org.apache.commons.lang.CharSet;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.common.exception.NotImplementedException;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.text.TextLineSerDe;
-import org.apache.tajo.storage.text.TextLineSerializer;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-
-public class JsonLineSerializer extends TextLineSerializer {
-  private static ProtobufJsonFormat protobufJsonFormat = 
ProtobufJsonFormat.getInstance();
-
-  private Type [] types;
-  private String [] simpleNames;
-  private int columnNum;
-
-
-  public JsonLineSerializer(Schema schema, TableMeta meta) {
-    super(schema, meta);
-  }
-
-  @Override
-  public void init() {
-    types = SchemaUtil.toTypes(schema);
-    simpleNames = SchemaUtil.toSimpleNames(schema);
-    columnNum = schema.size();
-  }
-
-  @Override
-  public int serialize(OutputStream out, Tuple input) throws IOException {
-    JSONObject jsonObject = new JSONObject();
-
-    for (int i = 0; i < columnNum; i++) {
-      if (input.isNull(i)) {
-        continue;
-      }
-
-      String fieldName = simpleNames[i];
-      Type type = types[i];
-
-      switch (type) {
-
-      case BOOLEAN:
-        jsonObject.put(fieldName, input.getBool(i));
-        break;
-
-      case INT1:
-      case INT2:
-        jsonObject.put(fieldName, input.getInt2(i));
-        break;
-
-      case INT4:
-        jsonObject.put(fieldName, input.getInt4(i));
-        break;
-
-      case INT8:
-        jsonObject.put(fieldName, input.getInt8(i));
-        break;
-
-      case FLOAT4:
-        jsonObject.put(fieldName, input.getFloat4(i));
-        break;
-
-      case FLOAT8:
-        jsonObject.put(fieldName, input.getFloat8(i));
-        break;
-
-      case CHAR:
-      case TEXT:
-      case VARCHAR:
-      case INET4:
-      case TIMESTAMP:
-      case DATE:
-      case TIME:
-      case INTERVAL:
-        jsonObject.put(fieldName, input.getText(i));
-        break;
-
-      case BIT:
-      case BINARY:
-      case BLOB:
-      case VARBINARY:
-        jsonObject.put(fieldName, input.getBytes(i));
-        break;
-
-      case NULL_TYPE:
-        break;
-
-      default:
-        throw new NotImplementedException(types[i].name() + " is not 
supported.");
-      }
-    }
-
-    String jsonStr = jsonObject.toJSONString();
-    byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET);
-    out.write(jsonBytes);
-    return jsonBytes.length;
-  }
-
-  @Override
-  public void release() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
deleted file mode 100644
index 3a3bb57..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.tajo.storage.StorageConstants;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.FileAppender;
-import org.apache.tajo.storage.TableStatistics;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-/**
- * FileAppender for writing to Parquet files.
- */
-public class ParquetAppender extends FileAppender {
-  private TajoParquetWriter writer;
-  private int blockSize;
-  private int pageSize;
-  private CompressionCodecName compressionCodecName;
-  private boolean enableDictionary;
-  private boolean validating;
-  private TableStatistics stats;
-
-  /**
-   * Creates a new ParquetAppender.
-   *
-   * @param conf Configuration properties.
-   * @param schema The table schema.
-   * @param meta The table metadata.
-   * @param path The path of the Parquet file to write to.
-   */
-  public ParquetAppender(Configuration conf, Schema schema, TableMeta meta,
-                         Path path) throws IOException {
-    super(conf, schema, meta, path);
-    this.blockSize = Integer.parseInt(
-        meta.getOption(ParquetOutputFormat.BLOCK_SIZE, 
StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE));
-    this.pageSize = Integer.parseInt(
-        meta.getOption(ParquetOutputFormat.PAGE_SIZE, 
StorageConstants.PARQUET_DEFAULT_PAGE_SIZE));
-    this.compressionCodecName = CompressionCodecName.fromConf(
-        meta.getOption(ParquetOutputFormat.COMPRESSION, 
StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME));
-    this.enableDictionary = Boolean.parseBoolean(
-        meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY, 
StorageConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED));
-    this.validating = Boolean.parseBoolean(
-        meta.getOption(ParquetOutputFormat.VALIDATION, 
StorageConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED));
-  }
-
-  /**
-   * Initializes the Appender. This method creates a new TajoParquetWriter
-   * and initializes the table statistics if enabled.
-   */
-  public void init() throws IOException {
-    writer = new TajoParquetWriter(path,
-                                   schema,
-                                   compressionCodecName,
-                                   blockSize,
-                                   pageSize,
-                                   enableDictionary,
-                                   validating);
-    if (enabledStats) {
-      this.stats = new TableStatistics(schema);
-    }
-    super.init();
-  }
-
-  /**
-   * Gets the current offset. Tracking offsets is currenly not implemented, so
-   * this method always returns 0.
-   *
-   * @return 0
-   */
-  @Override
-  public long getOffset() throws IOException {
-    return 0;
-  }
-
-  /**
-   * Write a Tuple to the Parquet file.
-   *
-   * @param tuple The Tuple to write.
-   */
-  @Override
-  public void addTuple(Tuple tuple) throws IOException {
-    if (enabledStats) {
-      for (int i = 0; i < schema.size(); ++i) {
-        stats.analyzeField(i, tuple.get(i));
-      }
-    }
-    writer.write(tuple);
-    if (enabledStats) {
-      stats.incrementRow();
-    }
-  }
-
-  /**
-   * The ParquetWriter does not need to be flushed, so this is a no-op.
-   */
-  @Override
-  public void flush() throws IOException {
-  }
-
-  /**
-   * Closes the Appender.
-   */
-  @Override
-  public void close() throws IOException {
-    writer.close();
-  }
-
-  public long getEstimatedOutputSize() throws IOException {
-    return writer.getEstimatedWrittenSize();
-  }
-
-  /**
-   * If table statistics is enabled, retrieve the table statistics.
-   *
-   * @return Table statistics if enabled or null otherwise.
-   */
-  @Override
-  public TableStats getStats() {
-    if (enabledStats) {
-      return stats.getTableStat();
-    } else {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
deleted file mode 100644
index 36b89b8..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.FileScanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-
-/**
- * FileScanner for reading Parquet files
- */
-public class ParquetScanner extends FileScanner {
-  private TajoParquetReader reader;
-
-  /**
-   * Creates a new ParquetScanner.
-   *
-   * @param conf
-   * @param schema
-   * @param meta
-   * @param fragment
-   */
-  public ParquetScanner(Configuration conf, final Schema schema,
-                        final TableMeta meta, final FileFragment fragment) {
-    super(conf, schema, meta, fragment);
-  }
-
-  /**
-   * Initializes the ParquetScanner. This method initializes the
-   * TajoParquetReader.
-   */
-  @Override
-  public void init() throws IOException {
-    if (targets == null) {
-      targets = schema.toArray();
-    }
-    reader = new TajoParquetReader(fragment.getPath(), schema, new 
Schema(targets));
-    super.init();
-  }
-
-  /**
-   * Reads the next Tuple from the Parquet file.
-   *
-   * @return The next Tuple from the Parquet file or null if end of file is
-   *         reached.
-   */
-  @Override
-  public Tuple next() throws IOException {
-    return reader.read();
-  }
-
-  /**
-   * Resets the scanner
-   */
-  @Override
-  public void reset() throws IOException {
-  }
-
-  /**
-   * Closes the scanner.
-   */
-  @Override
-  public void close() throws IOException {
-    if (reader != null) {
-      reader.close();
-    }
-  }
-
-  /**
-   * Returns whether this scanner is projectable.
-   *
-   * @return true
-   */
-  @Override
-  public boolean isProjectable() {
-    return true;
-  }
-
-  /**
-   * Returns whether this scanner is selectable.
-   *
-   * @return false
-   */
-  @Override
-  public boolean isSelectable() {
-    return false;
-  }
-
-  /**
-   * Returns whether this scanner is splittable.
-   *
-   * @return false
-   */
-  @Override
-  public boolean isSplittable() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
deleted file mode 100644
index a765f48..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.parquet.ParquetReader;
-import parquet.filter.UnboundRecordFilter;
-
-import java.io.IOException;
-
-/**
- * Tajo implementation of {@link ParquetReader} to read Tajo records from a
- * Parquet file. Users should use {@link ParquetScanner} and not this class
- * directly.
- */
-public class TajoParquetReader extends ParquetReader<Tuple> {
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   */
-  public TajoParquetReader(Path file, Schema readSchema) throws IOException {
-    super(file, new TajoReadSupport(readSchema));
-  }
-
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   * @param requestedSchema Tajo schema of the projection.
-   */
-  public TajoParquetReader(Path file, Schema readSchema,
-                           Schema requestedSchema) throws IOException {
-    super(file, new TajoReadSupport(readSchema, requestedSchema));
-  }
-
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   * @param recordFilter Record filter.
-   */
-  public TajoParquetReader(Path file, Schema readSchema,
-                           UnboundRecordFilter recordFilter)
-      throws IOException {
-    super(file, new TajoReadSupport(readSchema), recordFilter);
-  }
-
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   * @param requestedSchema Tajo schema of the projection.
-   * @param recordFilter Record filter.
-   */
-  public TajoParquetReader(Path file, Schema readSchema,
-                           Schema requestedSchema,
-                           UnboundRecordFilter recordFilter)
-      throws IOException {
-    super(file, new TajoReadSupport(readSchema, requestedSchema),
-          recordFilter);
-  }
-}

Reply via email to