http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
deleted file mode 100644
index 16c4faa..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.SchemaObject;
-import org.apache.tajo.catalog.statistics.TableStats;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Scanner Interface
- */
-public interface Scanner extends SchemaObject, Closeable {
-
-  void init() throws IOException;
-
-  /**
-   * It returns one tuple at each call. 
-   * 
-   * @return retrieve null if the scanner has no more tuples. 
-   * Otherwise it returns one tuple.
-   * 
-   * @throws IOException if internal I/O error occurs during next method
-   */
-  Tuple next() throws IOException;
-  
-  /**
-   * Reset the cursor. After executed, the scanner 
-   * will retrieve the first tuple.
-   * 
-   * @throws IOException if internal I/O error occurs during reset method
-   */
-  void reset() throws IOException;
-  
-  /**
-   * Close scanner
-   * 
-   * @throws IOException if internal I/O error occurs during close method
-   */
-  void close() throws IOException;
-
-
-  /**
-   * It returns if the projection is executed in the underlying scanner layer.
-   *
-   * @return true if this scanner can project the given columns.
-   */
-  boolean isProjectable();
-
-  /**
-   * Set target columns
-   * @param targets columns to be projected
-   */
-  void setTarget(Column [] targets);
-
-  /**
-   * It returns if the selection is executed in the underlying scanner layer.
-   *
-   * @return true if this scanner can filter tuples against a given condition.
-   */
-  boolean isSelectable();
-
-  /**
-   * Set a search condition
-   * @param expr to be searched
-   *
-   * TODO - to be changed Object type
-   */
-  void setSearchCondition(Object expr);
-
-  /**
-   * It returns if the file is splittable.
-   *
-   * @return true if this scanner can split the a file.
-   */
-  boolean isSplittable();
-
-  /**
-   * How much of the input has the Scanner consumed
-   * @return progress from <code>0.0</code> to <code>1.0</code>.
-   */
-  float getProgress();
-
-  TableStats getInputStats();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
deleted file mode 100644
index 894e7ee..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import java.io.IOException;
-
-public interface SeekableScanner extends Scanner {
-
-  public abstract long getNextOffset() throws IOException;
-
-  public abstract void seek(long offset) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
deleted file mode 100644
index 564a9f5..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-@Deprecated
-public interface SerializerDeserializer {
-
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] 
nullCharacters) throws IOException;
-
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, 
byte[] nullCharacters) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
deleted file mode 100644
index 3579674..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class SplitLineReader extends LineReader {
-  public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
-    super(in, recordDelimiterBytes);
-  }
-
-  public SplitLineReader(InputStream in, Configuration conf,
-                         byte[] recordDelimiterBytes) throws IOException {
-    super(in, conf, recordDelimiterBytes);
-  }
-
-  public boolean needAdditionalRecordAfterSplit() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
deleted file mode 100644
index cc85c1d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-
-public abstract class Storage {
-  protected final Configuration conf;
-  
-  public Storage(final Configuration conf) {
-    this.conf = conf;
-  }
-  
-  public Configuration getConf() {
-    return this.conf;
-  }
-  
-  public abstract Appender getAppender(TableMeta meta, Path path)
-    throws IOException;
-
-  public abstract Scanner openScanner(Schema schema, FileFragment[] tablets)
-    throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
deleted file mode 100644
index e37be58..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ /dev/null
@@ -1,926 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.rewrite.RewriteRule;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.storage.hbase.HBaseStorageManager;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.URI;
-import java.text.NumberFormat;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * StorageManager manages the functions of storing and reading data.
- * StorageManager is a abstract class.
- * For supporting such as HDFS, HBASE, a specific StorageManager should be 
implemented by inheriting this class.
- *
- */
-public abstract class StorageManager {
-  private final Log LOG = LogFactory.getLog(StorageManager.class);
-
-  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
-      Configuration.class,
-      Schema.class,
-      TableMeta.class,
-      Fragment.class
-  };
-
-  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
-      Configuration.class,
-      QueryUnitAttemptId.class,
-      Schema.class,
-      TableMeta.class,
-      Path.class
-  };
-
-  protected TajoConf conf;
-  protected StoreType storeType;
-
-  /**
-   * Cache of StorageManager.
-   * Key is manager key(warehouse path) + store type
-   */
-  private static final Map<String, StorageManager> storageManagers = 
Maps.newHashMap();
-
-  /**
-   * Cache of scanner handlers for each storage type.
-   */
-  protected static final Map<String, Class<? extends Scanner>> 
SCANNER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
-
-  /**
-   * Cache of appender handlers for each storage type.
-   */
-  protected static final Map<String, Class<? extends Appender>> 
APPENDER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends Appender>>();
-
-  /**
-   * Cache of constructors for each class. Pins the classes so they
-   * can't be garbage collected until ReflectionUtils can be collected.
-   */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
-      new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
-  public StorageManager(StoreType storeType) {
-    this.storeType = storeType;
-  }
-
-  /**
-   * Initialize storage manager.
-   * @throws IOException
-   */
-  protected abstract void storageInit() throws IOException;
-
-  /**
-   * This method is called after executing "CREATE TABLE" statement.
-   * If a storage is a file based storage, a storage manager may create 
directory.
-   *
-   * @param tableDesc Table description which is created.
-   * @param ifNotExists Creates the table only when the table does not exist.
-   * @throws IOException
-   */
-  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) 
throws IOException;
-
-  /**
-   * This method is called after executing "DROP TABLE" statement with the 
'PURGE' option
-   * which is the option to delete all the data.
-   *
-   * @param tableDesc
-   * @throws IOException
-   */
-  public abstract void purgeTable(TableDesc tableDesc) throws IOException;
-
-  /**
-   * Returns the splits that will serve as input for the scan tasks. The
-   * number of splits matches the number of regions in a table.
-   * @param fragmentId The table name or previous ExecutionBlockId
-   * @param tableDesc The table description for the target data.
-   * @param scanNode The logical node for scanning.
-   * @return The list of input fragments.
-   * @throws IOException
-   */
-  public abstract List<Fragment> getSplits(String fragmentId, TableDesc 
tableDesc,
-                                           ScanNode scanNode) throws 
IOException;
-
-  /**
-   * It returns the splits that will serve as input for the non-forward query 
scanner such as 'select * from table1'.
-   * The result list should be small. If there is many fragments for scanning, 
TajoMaster uses the paging navigation.
-   * @param tableDesc The table description for the target data.
-   * @param currentPage The current page number within the entire list.
-   * @param numFragments The number of fragments in the result.
-   * @return The list of input fragments.
-   * @throws IOException
-   */
-  public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int 
currentPage, int numFragments)
-      throws IOException;
-
-  /**
-   * It returns the storage property.
-   * @return The storage property
-   */
-  public abstract StorageProperty getStorageProperty();
-
-  /**
-   * Release storage manager resource
-   */
-  public abstract void closeStorageManager();
-
-  /**
-   * It is called by a Repartitioner for range shuffling when the 
SortRangeType of SortNode is STORAGE_SPECIFIED.
-   * In general Repartitioner determines the partition range using previous 
output statistics data.
-   * In the special cases, such as HBase Repartitioner uses the result of this 
method.
-   *
-   * @param queryContext The current query context which contains query 
properties.
-   * @param tableDesc The table description for the target data.
-   * @param inputSchema The input schema
-   * @param sortSpecs The sort specification that contains the sort column and 
sort order.
-   * @return The list of sort ranges.
-   * @throws IOException
-   */
-  public abstract TupleRange[] getInsertSortRanges(OverridableConf 
queryContext, TableDesc tableDesc,
-                                                   Schema inputSchema, 
SortSpec[] sortSpecs,
-                                                   TupleRange dataRange) 
throws IOException;
-
-  /**
-   * This method is called before executing 'INSERT' or 'CREATE TABLE as 
SELECT'.
-   * In general Tajo creates the target table after finishing the final 
sub-query of CATS.
-   * But In the special cases, such as HBase INSERT or CAST query uses the 
target table information.
-   * That kind of the storage should implements the logic related to creating 
table in this method.
-   *
-   * @param node The child node of the root node.
-   * @throws IOException
-   */
-  public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
-
-  /**
-   * It is called when the query failed.
-   * Each storage manager should implement to be processed when the query 
fails in this method.
-   *
-   * @param node The child node of the root node.
-   * @throws IOException
-   */
-  public abstract void rollbackOutputCommit(LogicalNode node) throws 
IOException;
-
-  /**
-   * Returns the current storage type.
-   * @return
-   */
-  public StoreType getStoreType() {
-    return storeType;
-  }
-
-  /**
-   * Initialize StorageManager instance. It should be called before using.
-   *
-   * @param tajoConf
-   * @throws IOException
-   */
-  public void init(TajoConf tajoConf) throws IOException {
-    this.conf = tajoConf;
-    storageInit();
-  }
-
-  /**
-   * Close StorageManager
-   * @throws IOException
-   */
-  public void close() throws IOException {
-    synchronized(storageManagers) {
-      for (StorageManager eachStorageManager: storageManagers.values()) {
-        eachStorageManager.closeStorageManager();
-      }
-    }
-  }
-
-  /**
-   * Returns the splits that will serve as input for the scan tasks. The
-   * number of splits matches the number of regions in a table.
-   *
-   * @param fragmentId The table name or previous ExecutionBlockId
-   * @param tableDesc The table description for the target data.
-   * @return The list of input fragments.
-   * @throws IOException
-   */
-  public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) 
throws IOException {
-    return getSplits(fragmentId, tableDesc, null);
-  }
-
-  /**
-   * Returns FileStorageManager instance.
-   *
-   * @param tajoConf Tajo system property.
-   * @return
-   * @throws IOException
-   */
-  public static FileStorageManager getFileStorageManager(TajoConf tajoConf) 
throws IOException {
-    return getFileStorageManager(tajoConf, null);
-  }
-
-  /**
-   * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in 
tajoConf with warehousePath parameter.
-   *
-   * @param tajoConf Tajo system property.
-   * @param warehousePath The warehouse directory to be set in the tajoConf.
-   * @return
-   * @throws IOException
-   */
-  public static FileStorageManager getFileStorageManager(TajoConf tajoConf, 
Path warehousePath) throws IOException {
-    URI uri;
-    TajoConf copiedConf = new TajoConf(tajoConf);
-    if (warehousePath != null) {
-      copiedConf.setVar(ConfVars.WAREHOUSE_DIR, 
warehousePath.toUri().toString());
-    }
-    uri = TajoConf.getWarehouseDir(copiedConf).toUri();
-    String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
-    return (FileStorageManager) getStorageManager(copiedConf, StoreType.CSV, 
key);
-  }
-
-  /**
-   * Returns the proper StorageManager instance according to the storeType.
-   *
-   * @param tajoConf Tajo system property.
-   * @param storeType Storage type
-   * @return
-   * @throws IOException
-   */
-  public static StorageManager getStorageManager(TajoConf tajoConf, String 
storeType) throws IOException {
-    if ("HBASE".equals(storeType)) {
-      return getStorageManager(tajoConf, StoreType.HBASE);
-    } else {
-      return getStorageManager(tajoConf, StoreType.CSV);
-    }
-  }
-
-  /**
-   * Returns the proper StorageManager instance according to the storeType.
-   *
-   * @param tajoConf Tajo system property.
-   * @param storeType Storage type
-   * @return
-   * @throws IOException
-   */
-  public static StorageManager getStorageManager(TajoConf tajoConf, StoreType 
storeType) throws IOException {
-    return getStorageManager(tajoConf, storeType, null);
-  }
-
-  /**
-   * Returns the proper StorageManager instance according to the storeType
-   *
-   * @param tajoConf Tajo system property.
-   * @param storeType Storage type
-   * @param managerKey Key that can identify each storage manager(may be a 
path)
-   * @return
-   * @throws IOException
-   */
-  public static synchronized StorageManager getStorageManager (
-      TajoConf tajoConf, StoreType storeType, String managerKey) throws 
IOException {
-    synchronized (storageManagers) {
-      String storeKey = storeType + managerKey;
-      StorageManager manager = storageManagers.get(storeKey);
-      if (manager == null) {
-        switch (storeType) {
-          case HBASE:
-            manager = new HBaseStorageManager(storeType);
-            break;
-          default:
-            manager = new FileStorageManager(storeType);
-        }
-
-        manager.init(tajoConf);
-        storageManagers.put(storeKey, manager);
-      }
-
-      return manager;
-    }
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @param target Columns which are selected.
-   * @return Scanner instance
-   * @throws IOException
-   */
-  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto 
fragment, Schema target) throws IOException {
-    return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), 
target);
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @return Scanner instance
-   * @throws IOException
-   */
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) 
throws IOException {
-    return getScanner(meta, schema, fragment, schema);
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @param target The output schema
-   * @return Scanner instance
-   * @throws IOException
-   */
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, 
Schema target) throws IOException {
-    if (fragment.isEmpty()) {
-      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
-      scanner.setTarget(target.toArray());
-
-      return scanner;
-    }
-
-    Scanner scanner;
-
-    Class<? extends Scanner> scannerClass = 
getScannerClass(meta.getStoreType());
-    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
-    if (scanner.isProjectable()) {
-      scanner.setTarget(target.toArray());
-    }
-
-    return scanner;
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param conf The system property
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @param target The output schema
-   * @return Scanner instance
-   * @throws IOException
-   */
-  public static synchronized SeekableScanner getSeekableScanner(
-      TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, 
Schema target) throws IOException {
-    return (SeekableScanner)getStorageManager(conf, 
meta.getStoreType()).getScanner(meta, schema, fragment, target);
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param conf The system property
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param path The data file path
-   * @return Scanner instance
-   * @throws IOException
-   */
-  public static synchronized SeekableScanner getSeekableScanner(
-      TajoConf conf, TableMeta meta, Schema schema, Path path) throws 
IOException {
-
-    FileSystem fs = path.getFileSystem(conf);
-    FileStatus status = fs.getFileStatus(path);
-    FileFragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
-
-    return getSeekableScanner(conf, meta, schema, fragment, schema);
-  }
-
-  /**
-   * Returns Appender instance.
-   * @param queryContext Query property.
-   * @param taskAttemptId Task id.
-   * @param meta Table meta data.
-   * @param schema Output schema.
-   * @param workDir Working directory
-   * @return Appender instance
-   * @throws IOException
-   */
-  public Appender getAppender(OverridableConf queryContext,
-                              QueryUnitAttemptId taskAttemptId, TableMeta 
meta, Schema schema, Path workDir)
-      throws IOException {
-    Appender appender;
-
-    Class<? extends Appender> appenderClass;
-
-    String handlerName = meta.getStoreType().name().toLowerCase();
-    appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
-    if (appenderClass == null) {
-      appenderClass = conf.getClass(
-          String.format("tajo.storage.appender-handler.%s.class",
-              meta.getStoreType().name().toLowerCase()), null, Appender.class);
-      APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
-    }
-
-    if (appenderClass == null) {
-      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
-    }
-
-    appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, 
schema, workDir);
-
-    return appender;
-  }
-
-  /**
-   * Creates a scanner instance.
-   *
-   * @param theClass Concrete class of scanner
-   * @param conf System property
-   * @param schema Input schema
-   * @param meta Table meta data
-   * @param fragment The fragment for scanning
-   * @param <T>
-   * @return The scanner instance
-   */
-  public static <T> T newScannerInstance(Class<T> theClass, Configuration 
conf, Schema schema, TableMeta meta,
-                                         Fragment fragment) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-
-  /**
-   * Creates a scanner instance.
-   *
-   * @param theClass Concrete class of scanner
-   * @param conf System property
-   * @param taskAttemptId Task id
-   * @param meta Table meta data
-   * @param schema Input schema
-   * @param workDir Working directory
-   * @param <T>
-   * @return The scanner instance
-   */
-  public static <T> T newAppenderInstance(Class<T> theClass, Configuration 
conf, QueryUnitAttemptId taskAttemptId,
-                                          TableMeta meta, Schema schema, Path 
workDir) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, 
meta, workDir});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-
-  /**
-   * Return the Scanner class for the StoreType that is defined in 
storage-default.xml.
-   *
-   * @param storeType store type
-   * @return The Scanner class
-   * @throws IOException
-   */
-  public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType 
storeType) throws IOException {
-    String handlerName = storeType.name().toLowerCase();
-    Class<? extends Scanner> scannerClass = 
SCANNER_HANDLER_CACHE.get(handlerName);
-    if (scannerClass == null) {
-      scannerClass = conf.getClass(
-          
String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()),
 null, Scanner.class);
-      SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
-    }
-
-    if (scannerClass == null) {
-      throw new IOException("Unknown Storage Type: " + storeType.name());
-    }
-
-    return scannerClass;
-  }
-
-  /**
-   * Return length of the fragment.
-   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from 
the configuration.
-   *
-   * @param conf Tajo system property
-   * @param fragment Fragment
-   * @return
-   */
-  public static long getFragmentLength(TajoConf conf, Fragment fragment) {
-    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
-      return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
-    } else {
-      return fragment.getLength();
-    }
-  }
-
-  /**
-   * It is called after making logical plan. Storage manager should verify the 
schema for inserting.
-   *
-   * @param tableDesc The table description of insert target.
-   * @param outSchema  The output schema of select query for inserting.
-   * @throws IOException
-   */
-  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) 
throws IOException {
-    // nothing to do
-  }
-
-  /**
-   * Returns the list of storage specified rewrite rules.
-   * This values are used by LogicalOptimizer.
-   *
-   * @param queryContext The query property
-   * @param tableDesc The description of the target table.
-   * @return The list of storage specified rewrite rules
-   * @throws IOException
-   */
-  public List<RewriteRule> getRewriteRules(OverridableConf queryContext, 
TableDesc tableDesc) throws IOException {
-    return null;
-  }
-
-  /**
-   * Finalizes result data. Tajo stores result data in the staging directory.
-   * If the query fails, clean up the staging directory.
-   * Otherwise the query is successful, move to the final directory from the 
staging directory.
-   *
-   * @param queryContext The query property
-   * @param finalEbId The final execution block id
-   * @param plan The query plan
-   * @param schema The final output schema
-   * @param tableDesc The description of the target table
-   * @return Saved path
-   * @throws IOException
-   */
-  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId 
finalEbId,
-                               LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc) throws IOException {
-    return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, 
true);
-  }
-
-  /**
-   * Finalizes result data. Tajo stores result data in the staging directory.
-   * If the query fails, clean up the staging directory.
-   * Otherwise the query is successful, move to the final directory from the 
staging directory.
-   *
-   * @param queryContext The query property
-   * @param finalEbId The final execution block id
-   * @param plan The query plan
-   * @param schema The final output schema
-   * @param tableDesc The description of the target table
-   * @param changeFileSeq If true change result file name with max sequence.
-   * @return Saved path
-   * @throws IOException
-   */
-  protected Path commitOutputData(OverridableConf queryContext, 
ExecutionBlockId finalEbId,
-                               LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc, boolean changeFileSeq) 
throws IOException {
-    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
-    Path stagingResultDir = new Path(stagingDir, 
TajoConstants.RESULT_DIR_NAME);
-    Path finalOutputDir;
-    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
-      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
-      FileSystem fs = stagingResultDir.getFileSystem(conf);
-
-      if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT 
OVERWRITE INTO
-
-        // It moves the original table into the temporary location.
-        // Then it moves the new result table into the original table location.
-        // Upon failed, it recovers the original table if possible.
-        boolean movedToOldTable = false;
-        boolean committed = false;
-        Path oldTableDir = new Path(stagingDir, 
TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
-
-        if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
-          // This is a map for existing non-leaf directory to rename. A key is 
current directory and a value is
-          // renaming directory.
-          Map<Path, Path> renameDirs = TUtil.newHashMap();
-          // This is a map for recovering existing partition directory. A key 
is current directory and a value is
-          // temporary directory to back up.
-          Map<Path, Path> recoveryDirs = TUtil.newHashMap();
-
-          try {
-            if (!fs.exists(finalOutputDir)) {
-              fs.mkdirs(finalOutputDir);
-            }
-
-            visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, 
stagingResultDir.toString(),
-                renameDirs, oldTableDir);
-
-            // Rename target partition directories
-            for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-              // Backup existing data files for recovering
-              if (fs.exists(entry.getValue())) {
-                String recoveryPathString = 
entry.getValue().toString().replaceAll(finalOutputDir.toString(),
-                    oldTableDir.toString());
-                Path recoveryPath = new Path(recoveryPathString);
-                fs.rename(entry.getValue(), recoveryPath);
-                fs.exists(recoveryPath);
-                recoveryDirs.put(entry.getValue(), recoveryPath);
-              }
-              // Delete existing directory
-              fs.delete(entry.getValue(), true);
-              // Rename staging directory to final output directory
-              fs.rename(entry.getKey(), entry.getValue());
-            }
-
-          } catch (IOException ioe) {
-            // Remove created dirs
-            for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-              fs.delete(entry.getValue(), true);
-            }
-
-            // Recovery renamed dirs
-            for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
-              fs.delete(entry.getValue(), true);
-              fs.rename(entry.getValue(), entry.getKey());
-            }
-            throw new IOException(ioe.getMessage());
-          }
-        } else {
-          try {
-            if (fs.exists(finalOutputDir)) {
-              fs.rename(finalOutputDir, oldTableDir);
-              movedToOldTable = fs.exists(oldTableDir);
-            } else { // if the parent does not exist, make its parent 
directory.
-              fs.mkdirs(finalOutputDir.getParent());
-            }
-
-            fs.rename(stagingResultDir, finalOutputDir);
-            committed = fs.exists(finalOutputDir);
-          } catch (IOException ioe) {
-            // recover the old table
-            if (movedToOldTable && !committed) {
-              fs.rename(oldTableDir, finalOutputDir);
-            }
-          }
-        }
-      } else {
-        String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
-
-        if (queryType != null && queryType.equals(NodeType.INSERT.name())) { 
// INSERT INTO an existing table
-
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(3);
-
-          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
-            for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-              if (eachFile.isFile()) {
-                LOG.warn("Partition table can't have file in a staging dir: " 
+ eachFile.getPath());
-                continue;
-              }
-              moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, -1, changeFileSeq);
-            }
-          } else {
-            int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, 
false) + 1;
-            for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-              if (eachFile.getPath().getName().startsWith("_")) {
-                continue;
-              }
-              moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, maxSeq++, changeFileSeq);
-            }
-          }
-          // checking all file moved and remove empty dir
-          verifyAllFileMoved(fs, stagingResultDir);
-          FileStatus[] files = fs.listStatus(stagingResultDir);
-          if (files != null && files.length != 0) {
-            for (FileStatus eachFile: files) {
-              LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
-            }
-          }
-        } else { // CREATE TABLE AS SELECT (CTAS)
-          fs.rename(stagingResultDir, finalOutputDir);
-          LOG.info("Moved from the staging dir to the output directory '" + 
finalOutputDir);
-        }
-      }
-    } else {
-      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-    }
-
-    return finalOutputDir;
-  }
-
-  /**
-   * Attach the sequence number to the output file name and than move the file 
into the final result path.
-   *
-   * @param fs FileSystem
-   * @param stagingResultDir The staging result dir
-   * @param fileStatus The file status
-   * @param finalOutputPath Final output path
-   * @param nf Number format
-   * @param fileSeq The sequence number
-   * @throws IOException
-   */
-  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
-                                          FileStatus fileStatus, Path 
finalOutputPath,
-                                          NumberFormat nf,
-                                          int fileSeq, boolean changeFileSeq) 
throws IOException {
-    if (fileStatus.isDirectory()) {
-      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-      if (subPath != null) {
-        Path finalSubPath = new Path(finalOutputPath, subPath);
-        if (!fs.exists(finalSubPath)) {
-          fs.mkdirs(finalSubPath);
-        }
-        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
-        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
-          if (eachFile.getPath().getName().startsWith("_")) {
-            continue;
-          }
-          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputPath, nf, ++maxSeq, changeFileSeq);
-        }
-      } else {
-        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + 
fileStatus.getPath());
-      }
-    } else {
-      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-      if (subPath != null) {
-        Path finalSubPath = new Path(finalOutputPath, subPath);
-        if (changeFileSeq) {
-          finalSubPath = new Path(finalSubPath.getParent(), 
replaceFileNameSeq(finalSubPath, fileSeq, nf));
-        }
-        if (!fs.exists(finalSubPath.getParent())) {
-          fs.mkdirs(finalSubPath.getParent());
-        }
-        if (fs.exists(finalSubPath)) {
-          throw new IOException("Already exists data file:" + finalSubPath);
-        }
-        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
-        if (success) {
-          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
-              "to final output[" + finalSubPath + "]");
-        } else {
-          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " 
+
-              "to final output[" + finalSubPath + "]");
-        }
-      }
-    }
-  }
-
-  /**
-   * Removes the path of the parent.
-   * @param parentPath
-   * @param childPath
-   * @return
-   */
-  private String extractSubPath(Path parentPath, Path childPath) {
-    String parentPathStr = parentPath.toUri().getPath();
-    String childPathStr = childPath.toUri().getPath();
-
-    if (parentPathStr.length() > childPathStr.length()) {
-      return null;
-    }
-
-    int index = childPathStr.indexOf(parentPathStr);
-    if (index != 0) {
-      return null;
-    }
-
-    return childPathStr.substring(parentPathStr.length() + 1);
-  }
-
-  /**
-   * Attach the sequence number to a path.
-   *
-   * @param path Path
-   * @param seq sequence number
-   * @param nf Number format
-   * @return New path attached with sequence number
-   * @throws IOException
-   */
-  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) 
throws IOException {
-    String[] tokens = path.getName().split("-");
-    if (tokens.length != 4) {
-      throw new IOException("Wrong result file name:" + path);
-    }
-    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + 
nf.format(seq);
-  }
-
-  /**
-   * Make sure all files are moved.
-   * @param fs FileSystem
-   * @param stagingPath The stagind directory
-   * @return
-   * @throws IOException
-   */
-  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws 
IOException {
-    FileStatus[] files = fs.listStatus(stagingPath);
-    if (files != null && files.length != 0) {
-      for (FileStatus eachFile: files) {
-        if (eachFile.isFile()) {
-          LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
-          return false;
-        } else {
-          if (verifyAllFileMoved(fs, eachFile.getPath())) {
-            fs.delete(eachFile.getPath(), false);
-          } else {
-            return false;
-          }
-        }
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * This method sets a rename map which includes renamed staging directory to 
final output directory recursively.
-   * If there exists some data files, this delete it for duplicate data.
-   *
-   *
-   * @param fs
-   * @param stagingPath
-   * @param outputPath
-   * @param stagingParentPathString
-   * @throws IOException
-   */
-  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path 
outputPath,
-                                         String stagingParentPathString,
-                                         Map<Path, Path> renameDirs, Path 
oldTableDir) throws IOException {
-    FileStatus[] files = fs.listStatus(stagingPath);
-
-    for(FileStatus eachFile : files) {
-      if (eachFile.isDirectory()) {
-        Path oldPath = eachFile.getPath();
-
-        // Make recover directory.
-        String recoverPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
-            oldTableDir.toString());
-        Path recoveryPath = new Path(recoverPathString);
-        if (!fs.exists(recoveryPath)) {
-          fs.mkdirs(recoveryPath);
-        }
-
-        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, 
stagingParentPathString,
-            renameDirs, oldTableDir);
-        // Find last order partition for renaming
-        String newPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
-            outputPath.toString());
-        Path newPath = new Path(newPathString);
-        if (!isLeafDirectory(fs, eachFile.getPath())) {
-          renameDirs.put(eachFile.getPath(), newPath);
-        } else {
-          if (!fs.exists(newPath)) {
-            fs.mkdirs(newPath);
-          }
-        }
-      }
-    }
-  }
-
-  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException 
{
-    boolean retValue = false;
-
-    FileStatus[] files = fs.listStatus(path);
-    for (FileStatus file : files) {
-      if (fs.isDirectory(file.getPath())) {
-        retValue = true;
-        break;
-      }
-    }
-
-    return retValue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java
deleted file mode 100644
index 6816d08..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-public class StorageProperty {
-  private boolean supportsInsertInto;
-  private boolean sortedInsert;
-
-  public boolean isSupportsInsertInto() {
-    return supportsInsertInto;
-  }
-
-  public void setSupportsInsertInto(boolean supportsInsertInto) {
-    this.supportsInsertInto = supportsInsertInto;
-  }
-
-  public boolean isSortedInsert() {
-    return sortedInsert;
-  }
-
-  public void setSortedInsert(boolean sortedInsert) {
-    this.sortedInsert = sortedInsert;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
deleted file mode 100644
index 4a66678..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.KeyValueSet;
-import parquet.hadoop.ParquetOutputFormat;
-import sun.nio.ch.DirectBuffer;
-
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class StorageUtil extends StorageConstants {
-  public static int getRowByteSize(Schema schema) {
-    int sum = 0;
-    for(Column col : schema.getColumns()) {
-      sum += StorageUtil.getColByteSize(col);
-    }
-
-    return sum;
-  }
-
-  public static int getColByteSize(Column col) {
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-        return 1;
-      case CHAR:
-        return 1;
-      case BIT:
-        return 1;
-      case INT2:
-        return 2;
-      case INT4:
-        return 4;
-      case INT8:
-        return 8;
-      case FLOAT4:
-        return 4;
-      case FLOAT8:
-        return 8;
-      case INET4:
-        return 4;
-      case INET6:
-        return 32;
-      case TEXT:
-        return 256;
-      case BLOB:
-        return 256;
-      case DATE:
-        return 4;
-      case TIME:
-        return 8;
-      case TIMESTAMP:
-        return 8;
-      default:
-        return 0;
-    }
-  }
-
-  public static void writeTableMeta(Configuration conf, Path tableroot, 
TableMeta meta) throws IOException {
-    FileSystem fs = tableroot.getFileSystem(conf);
-    FSDataOutputStream out = fs.create(new Path(tableroot, ".meta"));
-    FileUtil.writeProto(out, meta.getProto());
-    out.flush();
-    out.close();
-  }
-  
-  public static Path concatPath(String parent, String...childs) {
-    return concatPath(new Path(parent), childs);
-  }
-  
-  public static Path concatPath(Path parent, String...childs) {
-    StringBuilder sb = new StringBuilder();
-    
-    for(int i=0; i < childs.length; i++) {      
-      sb.append(childs[i]);
-      if(i < childs.length - 1)
-        sb.append("/");
-    }
-    
-    return new Path(parent, sb.toString());
-  }
-
-  public static KeyValueSet newPhysicalProperties(CatalogProtos.StoreType 
type) {
-    KeyValueSet options = new KeyValueSet();
-    if (CatalogProtos.StoreType.CSV == type) {
-      options.set(CSVFILE_DELIMITER, DEFAULT_FIELD_DELIMITER);
-    } else if (CatalogProtos.StoreType.RCFILE == type) {
-      options.set(RCFILE_SERDE, DEFAULT_BINARY_SERDE);
-    } else if (CatalogProtos.StoreType.SEQUENCEFILE == type) {
-      options.set(SEQUENCEFILE_SERDE, DEFAULT_TEXT_SERDE);
-      options.set(SEQUENCEFILE_DELIMITER, DEFAULT_FIELD_DELIMITER);
-    } else if (type == CatalogProtos.StoreType.PARQUET) {
-      options.set(ParquetOutputFormat.BLOCK_SIZE, PARQUET_DEFAULT_BLOCK_SIZE);
-      options.set(ParquetOutputFormat.PAGE_SIZE, PARQUET_DEFAULT_PAGE_SIZE);
-      options.set(ParquetOutputFormat.COMPRESSION, 
PARQUET_DEFAULT_COMPRESSION_CODEC_NAME);
-      options.set(ParquetOutputFormat.ENABLE_DICTIONARY, 
PARQUET_DEFAULT_IS_DICTIONARY_ENABLED);
-      options.set(ParquetOutputFormat.VALIDATION, 
PARQUET_DEFAULT_IS_VALIDATION_ENABLED);
-    }
-
-    return options;
-  }
-
-  static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";
-  static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*";
-
-  /**
-   * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or 
"part-[0-9]*-[0-9]*-[0-9]*".
-   *
-   * This method finds the maximum sequence number from existing data files 
through the above patterns.
-   * If it cannot find any matched file or the maximum number, it will return 
-1.
-   *
-   * @param fs
-   * @param path
-   * @param recursive
-   * @return The maximum sequence number
-   * @throws java.io.IOException
-   */
-  public static int getMaxFileSequence(FileSystem fs, Path path, boolean 
recursive) throws IOException {
-    if (!fs.isDirectory(path)) {
-      return -1;
-    }
-
-    FileStatus[] files = fs.listStatus(path);
-
-    if (files == null || files.length == 0) {
-      return -1;
-    }
-
-    int maxValue = -1;
-    List<Path> fileNamePatternMatchedList = new ArrayList<Path>();
-
-    for (FileStatus eachFile: files) {
-      // In the case of partition table, return largest value within all 
partition dirs.
-      if (eachFile.isDirectory() && recursive) {
-        int value = getMaxFileSequence(fs, eachFile.getPath(), recursive);
-        if (value > maxValue) {
-          maxValue = value;
-        }
-      } else {
-        if (eachFile.getPath().getName().matches(fileNamePatternV08) ||
-            eachFile.getPath().getName().matches(fileNamePatternV09)) {
-          fileNamePatternMatchedList.add(eachFile.getPath());
-        }
-      }
-    }
-
-    if (fileNamePatternMatchedList.isEmpty()) {
-      return maxValue;
-    }
-    Path lastFile = 
fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1);
-    String pathName = lastFile.getName();
-
-    // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>
-    // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence>
-    String[] pathTokens = pathName.split("-");
-    if (pathTokens.length == 3) {
-      return -1;
-    } else if(pathTokens.length == 4) {
-      return Integer.parseInt(pathTokens[3]);
-    } else {
-      return -1;
-    }
-  }
-
-  public static void closeBuffer(ByteBuffer buffer) {
-    if (buffer != null) {
-      if (buffer.isDirect()) {
-        ((DirectBuffer) buffer).cleaner().clean();
-      } else {
-        buffer.clear();
-      }
-    }
-  }
-
-  public static int readFully(InputStream is, byte[] buffer, int offset, int 
length)
-      throws IOException {
-    int nread = 0;
-    while (nread < length) {
-      int nbytes = is.read(buffer, offset + nread, length - nread);
-      if (nbytes < 0) {
-        return nread > 0 ? nread : nbytes;
-      }
-      nread += nbytes;
-    }
-    return nread;
-  }
-
-  /**
-   * Similar to readFully(). Skips bytes in a loop.
-   * @param in The DataInput to skip bytes from
-   * @param len number of bytes to skip.
-   * @throws java.io.IOException if it could not skip requested number of bytes
-   * for any reason (including EOF)
-   */
-  public static void skipFully(DataInput in, int len) throws IOException {
-    int amt = len;
-    while (amt > 0) {
-      long ret = in.skipBytes(amt);
-      if (ret == 0) {
-        // skip may return 0 even if we're not at EOF.  Luckily, we can
-        // use the read() method to figure out if we're at the end.
-        int b = in.readByte();
-        if (b == -1) {
-          throw new EOFException( "Premature EOF from inputStream after " +
-              "skipping " + (len - amt) + " byte(s).");
-        }
-        ret = 1;
-      }
-      amt -= ret;
-    }
-  }
-
-  public static boolean isFileStorageType(StoreType storageType) {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
deleted file mode 100644
index a2c08de..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-
-/**
- * This class is not thread-safe.
- */
-public class TableStatistics {
-  private static final Log LOG = LogFactory.getLog(TableStatistics.class);
-  private Schema schema;
-  private Tuple minValues;
-  private Tuple maxValues;
-  private long [] numNulls;
-  private long numRows = 0;
-  private long numBytes = 0;
-
-  private boolean [] comparable;
-
-  public TableStatistics(Schema schema) {
-    this.schema = schema;
-    minValues = new VTuple(schema.size());
-    maxValues = new VTuple(schema.size());
-
-    numNulls = new long[schema.size()];
-    comparable = new boolean[schema.size()];
-
-    DataType type;
-    for (int i = 0; i < schema.size(); i++) {
-      type = schema.getColumn(i).getDataType();
-      if (type.getType() == Type.PROTOBUF) {
-        comparable[i] = false;
-      } else {
-        comparable[i] = true;
-      }
-    }
-  }
-
-  public Schema getSchema() {
-    return this.schema;
-  }
-
-  public void incrementRow() {
-    numRows++;
-  }
-
-  public long getNumRows() {
-    return this.numRows;
-  }
-
-  public void setNumBytes(long bytes) {
-    this.numBytes = bytes;
-  }
-
-  public long getNumBytes() {
-    return this.numBytes;
-  }
-
-  public void analyzeField(int idx, Datum datum) {
-    if (datum instanceof NullDatum) {
-      numNulls[idx]++;
-      return;
-    }
-
-    if (comparable[idx]) {
-      if (!maxValues.contains(idx) ||
-          maxValues.get(idx).compareTo(datum) < 0) {
-        maxValues.put(idx, datum);
-      }
-      if (!minValues.contains(idx) ||
-          minValues.get(idx).compareTo(datum) > 0) {
-        minValues.put(idx, datum);
-      }
-    }
-  }
-
-  public TableStats getTableStat() {
-    TableStats stat = new TableStats();
-
-    ColumnStats columnStats;
-    for (int i = 0; i < schema.size(); i++) {
-      columnStats = new ColumnStats(schema.getColumn(i));
-      columnStats.setNumNulls(numNulls[i]);
-      if (minValues.get(i) == null || 
schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) {
-        columnStats.setMinValue(minValues.get(i));
-      } else {
-        LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
-            ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
-      }
-      if (maxValues.get(i) == null || 
schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) {
-        columnStats.setMaxValue(maxValues.get(i));
-      } else {
-        LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() +
-            ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
-      }
-      stat.addColumnStat(columnStats);
-    }
-
-    stat.setNumRows(this.numRows);
-    stat.setNumBytes(this.numBytes);
-
-    return stat;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
deleted file mode 100644
index 094d285..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.protobuf.Message;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.NumberUtil;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-//Compatibility with Apache Hive
-@Deprecated
-public class TextSerializerDeserializer implements SerializerDeserializer {
-  public static final byte[] trueBytes = "true".getBytes();
-  public static final byte[] falseBytes = "false".getBytes();
-  private ProtobufJsonFormat protobufJsonFormat = 
ProtobufJsonFormat.getInstance();
-
-
-  @Override
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] 
nullCharacters) throws IOException {
-
-    byte[] bytes;
-    int length = 0;
-    TajoDataTypes.DataType dataType = col.getDataType();
-
-    if (datum == null || datum instanceof NullDatum) {
-      switch (dataType.getType()) {
-        case CHAR:
-        case TEXT:
-          length = nullCharacters.length;
-          out.write(nullCharacters);
-          break;
-        default:
-          break;
-      }
-      return length;
-    }
-
-    switch (dataType.getType()) {
-      case BOOLEAN:
-        out.write(datum.asBool() ? trueBytes : falseBytes);
-        length = trueBytes.length;
-        break;
-      case CHAR:
-        byte[] pad = new byte[dataType.getLength() - datum.size()];
-        bytes = datum.asTextBytes();
-        out.write(bytes);
-        out.write(pad);
-        length = bytes.length + pad.length;
-        break;
-      case TEXT:
-      case BIT:
-      case INT2:
-      case INT4:
-      case INT8:
-      case FLOAT4:
-      case FLOAT8:
-      case INET4:
-      case DATE:
-      case INTERVAL:
-        bytes = datum.asTextBytes();
-        length = bytes.length;
-        out.write(bytes);
-        break;
-      case TIME:
-        bytes = ((TimeDatum)datum).asChars(TajoConf.getCurrentTimeZone(), 
true).getBytes();
-        length = bytes.length;
-        out.write(bytes);
-        break;
-      case TIMESTAMP:
-        bytes = ((TimestampDatum)datum).asChars(TajoConf.getCurrentTimeZone(), 
true).getBytes();
-        length = bytes.length;
-        out.write(bytes);
-        break;
-      case INET6:
-      case BLOB:
-        bytes = Base64.encodeBase64(datum.asByteArray(), false);
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case PROTOBUF:
-        ProtobufDatum protobuf = (ProtobufDatum) datum;
-        byte[] protoBytes = 
protobufJsonFormat.printToString(protobuf.get()).getBytes();
-        length = protoBytes.length;
-        out.write(protoBytes, 0, protoBytes.length);
-        break;
-      case NULL_TYPE:
-      default:
-        break;
-    }
-    return length;
-  }
-
-  @Override
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, 
byte[] nullCharacters) throws IOException {
-
-    Datum datum;
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 
'T');
-        break;
-      case BIT:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, 
length)));
-        break;
-      case CHAR:
-        datum = isNullText(bytes, offset, length, nullCharacters) ? 
NullDatum.get()
-            : DatumFactory.createChar(new String(bytes, offset, 
length).trim());
-        break;
-      case INT1:
-      case INT2:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, 
offset, length));
-        break;
-      case INT4:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInt4(NumberUtil.parseInt(bytes, offset, 
length));
-        break;
-      case INT8:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInt8(new String(bytes, offset, length));
-        break;
-      case FLOAT4:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createFloat4(new String(bytes, offset, length));
-        break;
-      case FLOAT8:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, offset, 
length));
-        break;
-      case TEXT: {
-        byte[] chars = new byte[length];
-        System.arraycopy(bytes, offset, chars, 0, length);
-        datum = isNullText(bytes, offset, length, nullCharacters) ? 
NullDatum.get()
-            : DatumFactory.createText(chars);
-        break;
-      }
-      case DATE:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createDate(new String(bytes, offset, length));
-        break;
-      case TIME:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createTime(new String(bytes, offset, length));
-        break;
-      case TIMESTAMP:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createTimestamp(new String(bytes, offset, length));
-        break;
-      case INTERVAL:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInterval(new String(bytes, offset, length));
-        break;
-      case PROTOBUF: {
-        if (isNull(bytes, offset, length, nullCharacters)) {
-          datum = NullDatum.get();
-        } else {
-          ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(col.getDataType());
-          Message.Builder builder = factory.newBuilder();
-          try {
-            byte[] protoBytes = new byte[length];
-            System.arraycopy(bytes, offset, protoBytes, 0, length);
-            protobufJsonFormat.merge(protoBytes, builder);
-            datum = factory.createDatum(builder.build());
-          } catch (IOException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        }
-        break;
-      }
-      case INET4:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInet4(new String(bytes, offset, length));
-        break;
-      case BLOB: {
-        if (isNull(bytes, offset, length, nullCharacters)) {
-          datum = NullDatum.get();
-        } else {
-          byte[] blob = new byte[length];
-          System.arraycopy(bytes, offset, blob, 0, length);
-          datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
-        }
-        break;
-      }
-      default:
-        datum = NullDatum.get();
-        break;
-    }
-    return datum;
-  }
-
-  private static boolean isNull(byte[] val, int offset, int length, byte[] 
nullBytes) {
-    return length == 0 || ((length == nullBytes.length)
-        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
-  }
-
-  private static boolean isNullText(byte[] val, int offset, int length, byte[] 
nullBytes) {
-    return length > 0 && length == nullBytes.length
-        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
deleted file mode 100644
index 8dffd8d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.common.ProtoObject;
-
-import java.util.Comparator;
-
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-public abstract class TupleComparator implements Comparator<Tuple>, 
ProtoObject<TupleComparatorProto> {
-
-  public abstract int compare(Tuple o1, Tuple o2);
-
-  public abstract boolean isAscendingFirstKey();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
deleted file mode 100644
index e824b99..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Objects;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-
-import java.util.Comparator;
-
-/**
- * It represents a pair of start and end tuples.
- */
-public class TupleRange implements Comparable<TupleRange>, Cloneable {
-  private Tuple start;
-  private Tuple end;
-  private final TupleComparator comp;
-
-  public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple 
end) {
-    this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), 
sortSpecs);
-    // if there is only one value, start == end
-    this.start = start;
-    this.end = end;
-  }
-
-  public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
-    Schema schema = new Schema();
-    for (SortSpec spec : sortSpecs) {
-      schema.addColumn(spec.getSortKey());
-    }
-
-    return schema;
-  }
-
-  public void setStart(Tuple tuple) {
-    this.start = tuple;
-  }
-
-  public final Tuple getStart() {
-    return this.start;
-  }
-
-  public void setEnd(Tuple tuple) {
-    this.end = tuple;
-  }
-
-  public final Tuple getEnd() {
-    return this.end;
-  }
-
-  public String toString() {
-    return "[" + this.start + ", " + this.end + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(start, end);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof TupleRange) {
-      TupleRange other = (TupleRange) obj;
-      return this.start.equals(other.start) && this.end.equals(other.end);
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int compareTo(TupleRange o) {
-    // TODO - should handle overlap
-    int cmpVal = comp.compare(this.start, o.start);
-    if (cmpVal != 0) {
-      return cmpVal;
-    } else {
-      return comp.compare(this.end, o.end);
-    }
-  }
-
-  public static class DescendingTupleRangeComparator
-      implements Comparator<TupleRange> {
-
-    @Override
-    public int compare(TupleRange left, TupleRange right) {
-      return right.compareTo(left);
-    }
-  }
-
-  public TupleRange clone() throws CloneNotSupportedException {
-    TupleRange newRange = (TupleRange) super.clone();
-    newRange.setStart(start.clone());
-    newRange.setEnd(end.clone());
-    return newRange;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
deleted file mode 100644
index ad19101..0000000
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.annotation;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface ForSplitableStore {
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
deleted file mode 100644
index 9e1e7ea..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.FileAppender;
-import org.apache.tajo.storage.TableStatistics;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-/**
- * FileAppender for writing to Avro files.
- */
-public class AvroAppender extends FileAppender {
-  private TableStatistics stats;
-  private Schema avroSchema;
-  private List<Schema.Field> avroFields;
-  private DataFileWriter<GenericRecord> dataFileWriter;
-
-  /**
-   * Creates a new AvroAppender.
-   *
-   * @param conf Configuration properties.
-   * @param schema The table schema.
-   * @param meta The table metadata.
-   * @param workDir The path of the Parquet file to write to.
-   */
-  public AvroAppender(Configuration conf,
-                      QueryUnitAttemptId taskAttemptId,
-                      org.apache.tajo.catalog.Schema schema,
-                      TableMeta meta, Path workDir) throws IOException {
-    super(conf, taskAttemptId, schema, meta, workDir);
-  }
-
-  /**
-   * Initializes the Appender.
-   */
-  public void init() throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    if (!fs.exists(path.getParent())) {
-      throw new FileNotFoundException(path.toString());
-    }
-    FSDataOutputStream outputStream = fs.create(path);
-
-    avroSchema = AvroUtil.getAvroSchema(meta, conf);
-    avroFields = avroSchema.getFields();
-
-    DatumWriter<GenericRecord> datumWriter =
-        new GenericDatumWriter<GenericRecord>(avroSchema);
-    dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
-    dataFileWriter.create(avroSchema, outputStream);
-
-    if (enabledStats) {
-      this.stats = new TableStatistics(schema);
-    }
-    super.init();
-  }
-
-  /**
-   * Gets the current offset. Tracking offsets is currenly not implemented, so
-   * this method always returns 0.
-   *
-   * @return 0
-   */
-  @Override
-  public long getOffset() throws IOException {
-    return 0;
-  }
-
-  private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
-    if (tuple.get(i) instanceof NullDatum) {
-      return null;
-    }
-    switch (avroType) {
-      case NULL:
-        return null;
-      case BOOLEAN:
-        return tuple.getBool(i);
-      case INT:
-        return tuple.getInt4(i);
-      case LONG:
-        return tuple.getInt8(i);
-      case FLOAT:
-        return tuple.getFloat4(i);
-      case DOUBLE:
-        return tuple.getFloat8(i);
-      case BYTES:
-      case FIXED:
-        return ByteBuffer.wrap(tuple.getBytes(i));
-      case STRING:
-        return tuple.getText(i);
-      default:
-        throw new RuntimeException("Unknown primitive type.");
-    }
-  }
-
-  /**
-   * Write a Tuple to the Avro file.
-   *
-   * @param tuple The Tuple to write.
-   */
-  @Override
-  public void addTuple(Tuple tuple) throws IOException {
-    GenericRecord record = new GenericData.Record(avroSchema);
-    for (int i = 0; i < schema.size(); ++i) {
-      Column column = schema.getColumn(i);
-      if (enabledStats) {
-        stats.analyzeField(i, tuple.get(i));
-      }
-      Object value;
-      Schema.Field avroField = avroFields.get(i);
-      Schema.Type avroType = avroField.schema().getType();
-      switch (avroType) {
-        case NULL:
-        case BOOLEAN:
-        case INT:
-        case LONG:
-        case FLOAT:
-        case DOUBLE:
-        case BYTES:
-        case STRING:
-        case FIXED:
-          value = getPrimitive(tuple, i, avroType);
-          break;
-        case RECORD:
-          throw new RuntimeException("Avro RECORD not supported.");
-        case ENUM:
-          throw new RuntimeException("Avro ENUM not supported.");
-        case MAP:
-          throw new RuntimeException("Avro MAP not supported.");
-        case UNION:
-          List<Schema> schemas = avroField.schema().getTypes();
-          if (schemas.size() != 2) {
-            throw new RuntimeException("Avro UNION not supported.");
-          }
-          if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
-            value = getPrimitive(tuple, i, schemas.get(1).getType());
-          } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
-            value = getPrimitive(tuple, i, schemas.get(0).getType());
-          } else {
-            throw new RuntimeException("Avro UNION not supported.");
-          }
-          break;
-        default:
-          throw new RuntimeException("Unknown type: " + avroType);
-      }
-      record.put(i, value);
-    }
-    dataFileWriter.append(record);
-
-    if (enabledStats) {
-      stats.incrementRow();
-    }
-  }
-
-  /**
-   * Flushes the current state of the file.
-   */
-  @Override
-  public void flush() throws IOException {
-    dataFileWriter.flush();
-  }
-
-  /**
-   * Closes the Appender.
-   */
-  @Override
-  public void close() throws IOException {
-    dataFileWriter.close();
-  }
-
-  /**
-   * If table statistics is enabled, retrieve the table statistics.
-   *
-   * @return Table statistics if enabled or null otherwise.
-   */
-  @Override
-  public TableStats getStats() {
-    if (enabledStats) {
-      return stats.getTableStat();
-    } else {
-      return null;
-    }
-  }
-}

Reply via email to