http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
new file mode 100644
index 0000000..e2d89d6
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -0,0 +1,979 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * StorageManager manages the functions of storing and reading data.
+ * StorageManager is a abstract class.
+ * For supporting such as HDFS, HBASE, a specific StorageManager should be 
implemented by inheriting this class.
+ *
+ */
+public abstract class StorageManager {
+  private final Log LOG = LogFactory.getLog(StorageManager.class);
+
+  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+      Configuration.class,
+      Schema.class,
+      TableMeta.class,
+      Fragment.class
+  };
+
+  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+      Configuration.class,
+      QueryUnitAttemptId.class,
+      Schema.class,
+      TableMeta.class,
+      Path.class
+  };
+
+  public static final PathFilter hiddenFileFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  protected TajoConf conf;
+  protected StoreType storeType;
+
+  /**
+   * Cache of StorageManager.
+   * Key is manager key(warehouse path) + store type
+   */
+  private static final Map<String, StorageManager> storageManagers = 
Maps.newHashMap();
+
+  /**
+   * Cache of scanner handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Scanner>> 
SCANNER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+
+  /**
+   * Cache of appender handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Appender>> 
APPENDER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Appender>>();
+
+  /**
+   * Cache of constructors for each class. Pins the classes so they
+   * can't be garbage collected until ReflectionUtils can be collected.
+   */
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+      new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+  public StorageManager(StoreType storeType) {
+    this.storeType = storeType;
+  }
+
+  /**
+   * Initialize storage manager.
+   * @throws java.io.IOException
+   */
+  protected abstract void storageInit() throws IOException;
+
+  /**
+   * This method is called after executing "CREATE TABLE" statement.
+   * If a storage is a file based storage, a storage manager may create 
directory.
+   *
+   * @param tableDesc Table description which is created.
+   * @param ifNotExists Creates the table only when the table does not exist.
+   * @throws java.io.IOException
+   */
+  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) 
throws IOException;
+
+  /**
+   * This method is called after executing "DROP TABLE" statement with the 
'PURGE' option
+   * which is the option to delete all the data.
+   *
+   * @param tableDesc
+   * @throws java.io.IOException
+   */
+  public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+
+  /**
+   * Returns the splits that will serve as input for the scan tasks. The
+   * number of splits matches the number of regions in a table.
+   * @param fragmentId The table name or previous ExecutionBlockId
+   * @param tableDesc The table description for the target data.
+   * @param scanNode The logical node for scanning.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public abstract List<Fragment> getSplits(String fragmentId, TableDesc 
tableDesc,
+                                           ScanNode scanNode) throws 
IOException;
+
+  /**
+   * It returns the splits that will serve as input for the non-forward query 
scanner such as 'select * from table1'.
+   * The result list should be small. If there is many fragments for scanning, 
TajoMaster uses the paging navigation.
+   * @param tableDesc The table description for the target data.
+   * @param currentPage The current page number within the entire list.
+   * @param numFragments The number of fragments in the result.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int 
currentPage, int numFragments)
+      throws IOException;
+
+  /**
+   * It returns the storage property.
+   * @return The storage property
+   */
+  public abstract StorageProperty getStorageProperty();
+
+  /**
+   * Release storage manager resource
+   */
+  public abstract void closeStorageManager();
+
+  /**
+   * It is called by a Repartitioner for range shuffling when the 
SortRangeType of SortNode is USING_STORAGE_MANAGER.
+   * In general Repartitioner determines the partition range using previous 
output statistics data.
+   * In the special cases, such as HBase Repartitioner uses the result of this 
method.
+   *
+   * @param queryContext The current query context which contains query 
properties.
+   * @param tableDesc The table description for the target data.
+   * @param inputSchema The input schema
+   * @param sortSpecs The sort specification that contains the sort column and 
sort order.
+   * @return The list of sort ranges.
+   * @throws java.io.IOException
+   */
+  public abstract TupleRange[] getInsertSortRanges(OverridableConf 
queryContext, TableDesc tableDesc,
+                                                   Schema inputSchema, 
SortSpec[] sortSpecs,
+                                                   TupleRange dataRange) 
throws IOException;
+
+  /**
+   * This method is called before executing 'INSERT' or 'CREATE TABLE as 
SELECT'.
+   * In general Tajo creates the target table after finishing the final 
sub-query of CATS.
+   * But In the special cases, such as HBase INSERT or CAST query uses the 
target table information.
+   * That kind of the storage should implements the logic related to creating 
table in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws java.io.IOException
+   */
+  public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
+
+  /**
+   * It is called when the query failed.
+   * Each storage manager should implement to be processed when the query 
fails in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws java.io.IOException
+   */
+  public abstract void rollbackOutputCommit(LogicalNode node) throws 
IOException;
+
+  /**
+   * Returns the current storage type.
+   * @return
+   */
+  public StoreType getStoreType() {
+    return storeType;
+  }
+
+  /**
+   * Initialize StorageManager instance. It should be called before using.
+   *
+   * @param tajoConf
+   * @throws java.io.IOException
+   */
+  public void init(TajoConf tajoConf) throws IOException {
+    this.conf = tajoConf;
+    storageInit();
+  }
+
+  /**
+   * Close StorageManager
+   * @throws java.io.IOException
+   */
+  public void close() throws IOException {
+    synchronized(storageManagers) {
+      for (StorageManager eachStorageManager: storageManagers.values()) {
+        eachStorageManager.closeStorageManager();
+      }
+    }
+  }
+
+  /**
+   * Returns the splits that will serve as input for the scan tasks. The
+   * number of splits matches the number of regions in a table.
+   *
+   * @param fragmentId The table name or previous ExecutionBlockId
+   * @param tableDesc The table description for the target data.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) 
throws IOException {
+    return getSplits(fragmentId, tableDesc, null);
+  }
+
+  /**
+   * Returns FileStorageManager instance.
+   *
+   * @param tajoConf Tajo system property.
+   * @return
+   * @throws java.io.IOException
+   */
+  public static StorageManager getFileStorageManager(TajoConf tajoConf) throws 
IOException {
+    return getFileStorageManager(tajoConf, null);
+  }
+
+  /**
+   * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in 
tajoConf with warehousePath parameter.
+   *
+   * @param tajoConf Tajo system property.
+   * @param warehousePath The warehouse directory to be set in the tajoConf.
+   * @return
+   * @throws java.io.IOException
+   */
+  public static StorageManager getFileStorageManager(TajoConf tajoConf, Path 
warehousePath) throws IOException {
+    URI uri;
+    TajoConf copiedConf = new TajoConf(tajoConf);
+    if (warehousePath != null) {
+      copiedConf.setVar(ConfVars.WAREHOUSE_DIR, 
warehousePath.toUri().toString());
+    }
+    uri = TajoConf.getWarehouseDir(copiedConf).toUri();
+    String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
+    return getStorageManager(copiedConf, StoreType.CSV, key);
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType.
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @return
+   * @throws java.io.IOException
+   */
+  public static StorageManager getStorageManager(TajoConf tajoConf, String 
storeType) throws IOException {
+    if ("HBASE".equals(storeType)) {
+      return getStorageManager(tajoConf, StoreType.HBASE);
+    } else {
+      return getStorageManager(tajoConf, StoreType.CSV);
+    }
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType.
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @return
+   * @throws java.io.IOException
+   */
+  public static StorageManager getStorageManager(TajoConf tajoConf, StoreType 
storeType) throws IOException {
+    return getStorageManager(tajoConf, storeType, null);
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @param managerKey Key that can identify each storage manager(may be a 
path)
+   * @return
+   * @throws java.io.IOException
+   */
+  public static synchronized StorageManager getStorageManager (
+      TajoConf tajoConf, StoreType storeType, String managerKey) throws 
IOException {
+    synchronized (storageManagers) {
+      String storeKey = storeType + managerKey;
+      StorageManager manager = storageManagers.get(storeKey);
+      if (manager == null) {
+        String typeName = "hdfs";
+
+        switch (storeType) {
+          case HBASE:
+            typeName = "hbase";
+            break;
+          default:
+            typeName = "hdfs";
+        }
+
+        Class<? extends StorageManager> storageManagerClass =
+            tajoConf.getClass(String.format("tajo.storage.manager.%s.class", 
typeName), null, StorageManager.class);
+
+        if (storageManagerClass == null) {
+          throw new IOException("Unknown Storage Type: " + typeName);
+        }
+
+        try {
+          Constructor<? extends StorageManager> constructor =
+              (Constructor<? extends StorageManager>) 
CONSTRUCTOR_CACHE.get(storageManagerClass);
+          if (constructor == null) {
+            constructor = storageManagerClass.getDeclaredConstructor(new 
Class<?>[]{StoreType.class});
+            constructor.setAccessible(true);
+            CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
+          }
+          manager = constructor.newInstance(new Object[]{storeType});
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        manager.init(tajoConf);
+        storageManagers.put(storeKey, manager);
+      }
+
+      return manager;
+    }
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target Columns which are selected.
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto 
fragment, Schema target) throws IOException {
+    return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), 
target);
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) 
throws IOException {
+    return getScanner(meta, schema, fragment, schema);
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, 
Schema target) throws IOException {
+    if (fragment.isEmpty()) {
+      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
+      scanner.setTarget(target.toArray());
+
+      return scanner;
+    }
+
+    Scanner scanner;
+
+    Class<? extends Scanner> scannerClass = 
getScannerClass(meta.getStoreType());
+    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+    if (scanner.isProjectable()) {
+      scanner.setTarget(target.toArray());
+    }
+
+    return scanner;
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param conf The system property
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema 
target) throws IOException {
+    return (SeekableScanner)getStorageManager(conf, 
meta.getStoreType()).getScanner(meta, schema, fragment, target);
+  }
+
+  /**
+   * Returns Appender instance.
+   * @param queryContext Query property.
+   * @param taskAttemptId Task id.
+   * @param meta Table meta data.
+   * @param schema Output schema.
+   * @param workDir Working directory
+   * @return Appender instance
+   * @throws java.io.IOException
+   */
+  public Appender getAppender(OverridableConf queryContext,
+                              QueryUnitAttemptId taskAttemptId, TableMeta 
meta, Schema schema, Path workDir)
+      throws IOException {
+    Appender appender;
+
+    Class<? extends Appender> appenderClass;
+
+    String handlerName = meta.getStoreType().name().toLowerCase();
+    appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+    if (appenderClass == null) {
+      appenderClass = conf.getClass(
+          String.format("tajo.storage.appender-handler.%s.class",
+              meta.getStoreType().name().toLowerCase()), null, Appender.class);
+      APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+    }
+
+    if (appenderClass == null) {
+      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+    }
+
+    appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, 
schema, workDir);
+
+    return appender;
+  }
+
+  /**
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param schema Input schema
+   * @param meta Table meta data
+   * @param fragment The fragment for scanning
+   * @param <T>
+   * @return The scanner instance
+   */
+  public static <T> T newScannerInstance(Class<T> theClass, Configuration 
conf, Schema schema, TableMeta meta,
+                                         Fragment fragment) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param taskAttemptId Task id
+   * @param meta Table meta data
+   * @param schema Input schema
+   * @param workDir Working directory
+   * @param <T>
+   * @return The scanner instance
+   */
+  public static <T> T newAppenderInstance(Class<T> theClass, Configuration 
conf, QueryUnitAttemptId taskAttemptId,
+                                          TableMeta meta, Schema schema, Path 
workDir) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, 
meta, workDir});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Return the Scanner class for the StoreType that is defined in 
storage-default.xml.
+   *
+   * @param storeType store type
+   * @return The Scanner class
+   * @throws java.io.IOException
+   */
+  public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType 
storeType) throws IOException {
+    String handlerName = storeType.name().toLowerCase();
+    Class<? extends Scanner> scannerClass = 
SCANNER_HANDLER_CACHE.get(handlerName);
+    if (scannerClass == null) {
+      scannerClass = conf.getClass(
+          
String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()),
 null, Scanner.class);
+      SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+    }
+
+    if (scannerClass == null) {
+      throw new IOException("Unknown Storage Type: " + storeType.name());
+    }
+
+    return scannerClass;
+  }
+
+  /**
+   * Return length of the fragment.
+   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from 
the configuration.
+   *
+   * @param conf Tajo system property
+   * @param fragment Fragment
+   * @return
+   */
+  public static long getFragmentLength(TajoConf conf, Fragment fragment) {
+    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+      return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+    } else {
+      return fragment.getLength();
+    }
+  }
+
+  /**
+   * It is called after making logical plan. Storage manager should verify the 
schema for inserting.
+   *
+   * @param tableDesc The table description of insert target.
+   * @param outSchema  The output schema of select query for inserting.
+   * @throws java.io.IOException
+   */
+  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) 
throws IOException {
+    // nothing to do
+  }
+
+  /**
+   * Returns the list of storage specified rewrite rules.
+   * This values are used by LogicalOptimizer.
+   *
+   * @param queryContext The query property
+   * @param tableDesc The description of the target table.
+   * @return The list of storage specified rewrite rules
+   * @throws java.io.IOException
+   */
+  public List<RewriteRule> getRewriteRules(OverridableConf queryContext, 
TableDesc tableDesc) throws IOException {
+    return null;
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the 
staging directory.
+   *
+   * @param queryContext The query property
+   * @param finalEbId The final execution block id
+   * @param plan The query plan
+   * @param schema The final output schema
+   * @param tableDesc The description of the target table
+   * @return Saved path
+   * @throws java.io.IOException
+   */
+  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId 
finalEbId,
+                               LogicalPlan plan, Schema schema,
+                               TableDesc tableDesc) throws IOException {
+    return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, 
true);
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the 
staging directory.
+   *
+   * @param queryContext The query property
+   * @param finalEbId The final execution block id
+   * @param plan The query plan
+   * @param schema The final output schema
+   * @param tableDesc The description of the target table
+   * @param changeFileSeq If true change result file name with max sequence.
+   * @return Saved path
+   * @throws java.io.IOException
+   */
+  protected Path commitOutputData(OverridableConf queryContext, 
ExecutionBlockId finalEbId,
+                               LogicalPlan plan, Schema schema,
+                               TableDesc tableDesc, boolean changeFileSeq) 
throws IOException {
+    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
+    Path stagingResultDir = new Path(stagingDir, 
TajoConstants.RESULT_DIR_NAME);
+    Path finalOutputDir;
+    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
+      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
+      try {
+        FileSystem fs = stagingResultDir.getFileSystem(conf);
+
+        if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // 
INSERT OVERWRITE INTO
+
+          // It moves the original table into the temporary location.
+          // Then it moves the new result table into the original table 
location.
+          // Upon failed, it recovers the original table if possible.
+          boolean movedToOldTable = false;
+          boolean committed = false;
+          Path oldTableDir = new Path(stagingDir, 
TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+          ContentSummary summary = fs.getContentSummary(stagingResultDir);
+
+          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && 
summary.getFileCount() > 0L) {
+            // This is a map for existing non-leaf directory to rename. A key 
is current directory and a value is
+            // renaming directory.
+            Map<Path, Path> renameDirs = TUtil.newHashMap();
+            // This is a map for recovering existing partition directory. A 
key is current directory and a value is
+            // temporary directory to back up.
+            Map<Path, Path> recoveryDirs = TUtil.newHashMap();
+
+            try {
+              if (!fs.exists(finalOutputDir)) {
+                fs.mkdirs(finalOutputDir);
+              }
+
+              visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, 
stagingResultDir.toString(),
+                  renameDirs, oldTableDir);
+
+              // Rename target partition directories
+              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                // Backup existing data files for recovering
+                if (fs.exists(entry.getValue())) {
+                  String recoveryPathString = 
entry.getValue().toString().replaceAll(finalOutputDir.toString(),
+                      oldTableDir.toString());
+                  Path recoveryPath = new Path(recoveryPathString);
+                  fs.rename(entry.getValue(), recoveryPath);
+                  fs.exists(recoveryPath);
+                  recoveryDirs.put(entry.getValue(), recoveryPath);
+                }
+                // Delete existing directory
+                fs.delete(entry.getValue(), true);
+                // Rename staging directory to final output directory
+                fs.rename(entry.getKey(), entry.getValue());
+              }
+
+            } catch (IOException ioe) {
+              // Remove created dirs
+              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                fs.delete(entry.getValue(), true);
+              }
+
+              // Recovery renamed dirs
+              for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
+                fs.delete(entry.getValue(), true);
+                fs.rename(entry.getValue(), entry.getKey());
+              }
+
+              throw new IOException(ioe.getMessage());
+            }
+          } else { // no partition
+            try {
+
+              // if the final output dir exists, move all contents to the 
temporary table dir.
+              // Otherwise, just make the final output dir. As a result, the 
final output dir will be empty.
+              if (fs.exists(finalOutputDir)) {
+                fs.mkdirs(oldTableDir);
+
+                for (FileStatus status : fs.listStatus(finalOutputDir, 
StorageManager.hiddenFileFilter)) {
+                  fs.rename(status.getPath(), oldTableDir);
+                }
+
+                movedToOldTable = fs.exists(oldTableDir);
+              } else { // if the parent does not exist, make its parent 
directory.
+                fs.mkdirs(finalOutputDir);
+              }
+
+              // Move the results to the final output dir.
+              for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                fs.rename(status.getPath(), finalOutputDir);
+              }
+
+              // Check the final output dir
+              committed = fs.exists(finalOutputDir);
+
+            } catch (IOException ioe) {
+              // recover the old table
+              if (movedToOldTable && !committed) {
+
+                // if commit is failed, recover the old data
+                for (FileStatus status : fs.listStatus(finalOutputDir, 
StorageManager.hiddenFileFilter)) {
+                  fs.delete(status.getPath(), true);
+                }
+
+                for (FileStatus status : fs.listStatus(oldTableDir)) {
+                  fs.rename(status.getPath(), finalOutputDir);
+                }
+              }
+
+              throw new IOException(ioe.getMessage());
+            }
+          }
+        } else {
+          String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
+
+          if (queryType != null && queryType.equals(NodeType.INSERT.name())) { 
// INSERT INTO an existing table
+
+            NumberFormat fmt = NumberFormat.getInstance();
+            fmt.setGroupingUsed(false);
+            fmt.setMinimumIntegerDigits(3);
+
+            if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                if (eachFile.isFile()) {
+                  LOG.warn("Partition table can't have file in a staging dir: 
" + eachFile.getPath());
+                  continue;
+                }
+                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, -1, changeFileSeq);
+              }
+            } else {
+              int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, 
false) + 1;
+              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                if (eachFile.getPath().getName().startsWith("_")) {
+                  continue;
+                }
+                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, maxSeq++, changeFileSeq);
+              }
+            }
+            // checking all file moved and remove empty dir
+            verifyAllFileMoved(fs, stagingResultDir);
+            FileStatus[] files = fs.listStatus(stagingResultDir);
+            if (files != null && files.length != 0) {
+              for (FileStatus eachFile: files) {
+                LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
+              }
+            }
+          } else { // CREATE TABLE AS SELECT (CTAS)
+            if (fs.exists(finalOutputDir)) {
+              for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                fs.rename(status.getPath(), finalOutputDir);
+              }
+            } else {
+              fs.rename(stagingResultDir, finalOutputDir);
+            }
+            LOG.info("Moved from the staging dir to the output directory '" + 
finalOutputDir);
+          }
+        }
+
+        // remove the staging directory if the final output dir is given.
+        Path stagingDirRoot = stagingDir.getParent();
+        fs.delete(stagingDirRoot, true);
+      } catch (Throwable t) {
+        LOG.error(t);
+        throw new IOException(t);
+      }
+    } else {
+      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    }
+
+    return finalOutputDir;
+  }
+
+  /**
+   * Attach the sequence number to the output file name and than move the file 
into the final result path.
+   *
+   * @param fs FileSystem
+   * @param stagingResultDir The staging result dir
+   * @param fileStatus The file status
+   * @param finalOutputPath Final output path
+   * @param nf Number format
+   * @param fileSeq The sequence number
+   * @throws java.io.IOException
+   */
+  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
+                                          FileStatus fileStatus, Path 
finalOutputPath,
+                                          NumberFormat nf,
+                                          int fileSeq, boolean changeFileSeq) 
throws IOException {
+    if (fileStatus.isDirectory()) {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (!fs.exists(finalSubPath)) {
+          fs.mkdirs(finalSubPath);
+        }
+        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
+        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
+          if (eachFile.getPath().getName().startsWith("_")) {
+            continue;
+          }
+          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputPath, nf, ++maxSeq, changeFileSeq);
+        }
+      } else {
+        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + 
fileStatus.getPath());
+      }
+    } else {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (changeFileSeq) {
+          finalSubPath = new Path(finalSubPath.getParent(), 
replaceFileNameSeq(finalSubPath, fileSeq, nf));
+        }
+        if (!fs.exists(finalSubPath.getParent())) {
+          fs.mkdirs(finalSubPath.getParent());
+        }
+        if (fs.exists(finalSubPath)) {
+          throw new IOException("Already exists data file:" + finalSubPath);
+        }
+        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
+        if (success) {
+          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
+              "to final output[" + finalSubPath + "]");
+        } else {
+          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " 
+
+              "to final output[" + finalSubPath + "]");
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the path of the parent.
+   * @param parentPath
+   * @param childPath
+   * @return
+   */
+  private String extractSubPath(Path parentPath, Path childPath) {
+    String parentPathStr = parentPath.toUri().getPath();
+    String childPathStr = childPath.toUri().getPath();
+
+    if (parentPathStr.length() > childPathStr.length()) {
+      return null;
+    }
+
+    int index = childPathStr.indexOf(parentPathStr);
+    if (index != 0) {
+      return null;
+    }
+
+    return childPathStr.substring(parentPathStr.length() + 1);
+  }
+
+  /**
+   * Attach the sequence number to a path.
+   *
+   * @param path Path
+   * @param seq sequence number
+   * @param nf Number format
+   * @return New path attached with sequence number
+   * @throws java.io.IOException
+   */
+  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) 
throws IOException {
+    String[] tokens = path.getName().split("-");
+    if (tokens.length != 4) {
+      throw new IOException("Wrong result file name:" + path);
+    }
+    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + 
nf.format(seq);
+  }
+
+  /**
+   * Make sure all files are moved.
+   * @param fs FileSystem
+   * @param stagingPath The stagind directory
+   * @return
+   * @throws java.io.IOException
+   */
+  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws 
IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+    if (files != null && files.length != 0) {
+      for (FileStatus eachFile: files) {
+        if (eachFile.isFile()) {
+          LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
+          return false;
+        } else {
+          if (verifyAllFileMoved(fs, eachFile.getPath())) {
+            fs.delete(eachFile.getPath(), false);
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method sets a rename map which includes renamed staging directory to 
final output directory recursively.
+   * If there exists some data files, this delete it for duplicate data.
+   *
+   *
+   * @param fs
+   * @param stagingPath
+   * @param outputPath
+   * @param stagingParentPathString
+   * @throws java.io.IOException
+   */
+  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path 
outputPath,
+                                         String stagingParentPathString,
+                                         Map<Path, Path> renameDirs, Path 
oldTableDir) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+
+    for(FileStatus eachFile : files) {
+      if (eachFile.isDirectory()) {
+        Path oldPath = eachFile.getPath();
+
+        // Make recover directory.
+        String recoverPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
+            oldTableDir.toString());
+        Path recoveryPath = new Path(recoverPathString);
+        if (!fs.exists(recoveryPath)) {
+          fs.mkdirs(recoveryPath);
+        }
+
+        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, 
stagingParentPathString,
+            renameDirs, oldTableDir);
+        // Find last order partition for renaming
+        String newPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
+            outputPath.toString());
+        Path newPath = new Path(newPathString);
+        if (!isLeafDirectory(fs, eachFile.getPath())) {
+          renameDirs.put(eachFile.getPath(), newPath);
+        } else {
+          if (!fs.exists(newPath)) {
+            fs.mkdirs(newPath);
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException 
{
+    boolean retValue = false;
+
+    FileStatus[] files = fs.listStatus(path);
+    for (FileStatus file : files) {
+      if (fs.isDirectory(file.getPath())) {
+        retValue = true;
+        break;
+      }
+    }
+
+    return retValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
new file mode 100644
index 0000000..6816d08
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class StorageProperty {
+  private boolean supportsInsertInto;
+  private boolean sortedInsert;
+
+  public boolean isSupportsInsertInto() {
+    return supportsInsertInto;
+  }
+
+  public void setSupportsInsertInto(boolean supportsInsertInto) {
+    this.supportsInsertInto = supportsInsertInto;
+  }
+
+  public boolean isSortedInsert() {
+    return sortedInsert;
+  }
+
+  public void setSortedInsert(boolean sortedInsert) {
+    this.sortedInsert = sortedInsert;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
new file mode 100644
index 0000000..2b196c9
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.util.FileUtil;
+import sun.nio.ch.DirectBuffer;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StorageUtil extends StorageConstants {
+  public static int getRowByteSize(Schema schema) {
+    int sum = 0;
+    for(Column col : schema.getColumns()) {
+      sum += StorageUtil.getColByteSize(col);
+    }
+
+    return sum;
+  }
+
+  public static int getColByteSize(Column col) {
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+        return 1;
+      case CHAR:
+        return 1;
+      case BIT:
+        return 1;
+      case INT2:
+        return 2;
+      case INT4:
+        return 4;
+      case INT8:
+        return 8;
+      case FLOAT4:
+        return 4;
+      case FLOAT8:
+        return 8;
+      case INET4:
+        return 4;
+      case INET6:
+        return 32;
+      case TEXT:
+        return 256;
+      case BLOB:
+        return 256;
+      case DATE:
+        return 4;
+      case TIME:
+        return 8;
+      case TIMESTAMP:
+        return 8;
+      default:
+        return 0;
+    }
+  }
+
+  public static void writeTableMeta(Configuration conf, Path tableroot, 
TableMeta meta) throws IOException {
+    FileSystem fs = tableroot.getFileSystem(conf);
+    FSDataOutputStream out = fs.create(new Path(tableroot, ".meta"));
+    FileUtil.writeProto(out, meta.getProto());
+    out.flush();
+    out.close();
+  }
+  
+  public static Path concatPath(String parent, String...childs) {
+    return concatPath(new Path(parent), childs);
+  }
+  
+  public static Path concatPath(Path parent, String...childs) {
+    StringBuilder sb = new StringBuilder();
+    
+    for(int i=0; i < childs.length; i++) {      
+      sb.append(childs[i]);
+      if(i < childs.length - 1)
+        sb.append("/");
+    }
+    
+    return new Path(parent, sb.toString());
+  }
+
+  static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";
+  static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*";
+
+  /**
+   * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or 
"part-[0-9]*-[0-9]*-[0-9]*".
+   *
+   * This method finds the maximum sequence number from existing data files 
through the above patterns.
+   * If it cannot find any matched file or the maximum number, it will return 
-1.
+   *
+   * @param fs
+   * @param path
+   * @param recursive
+   * @return The maximum sequence number
+   * @throws java.io.IOException
+   */
+  public static int getMaxFileSequence(FileSystem fs, Path path, boolean 
recursive) throws IOException {
+    if (!fs.isDirectory(path)) {
+      return -1;
+    }
+
+    FileStatus[] files = fs.listStatus(path);
+
+    if (files == null || files.length == 0) {
+      return -1;
+    }
+
+    int maxValue = -1;
+    List<Path> fileNamePatternMatchedList = new ArrayList<Path>();
+
+    for (FileStatus eachFile: files) {
+      // In the case of partition table, return largest value within all 
partition dirs.
+      if (eachFile.isDirectory() && recursive) {
+        int value = getMaxFileSequence(fs, eachFile.getPath(), recursive);
+        if (value > maxValue) {
+          maxValue = value;
+        }
+      } else {
+        if (eachFile.getPath().getName().matches(fileNamePatternV08) ||
+            eachFile.getPath().getName().matches(fileNamePatternV09)) {
+          fileNamePatternMatchedList.add(eachFile.getPath());
+        }
+      }
+    }
+
+    if (fileNamePatternMatchedList.isEmpty()) {
+      return maxValue;
+    }
+    Path lastFile = 
fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1);
+    String pathName = lastFile.getName();
+
+    // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>
+    // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence>
+    String[] pathTokens = pathName.split("-");
+    if (pathTokens.length == 3) {
+      return -1;
+    } else if(pathTokens.length == 4) {
+      return Integer.parseInt(pathTokens[3]);
+    } else {
+      return -1;
+    }
+  }
+
+  public static void closeBuffer(ByteBuffer buffer) {
+    if (buffer != null) {
+      if (buffer.isDirect()) {
+        ((DirectBuffer) buffer).cleaner().clean();
+      } else {
+        buffer.clear();
+      }
+    }
+  }
+
+  public static int readFully(InputStream is, byte[] buffer, int offset, int 
length)
+      throws IOException {
+    int nread = 0;
+    while (nread < length) {
+      int nbytes = is.read(buffer, offset + nread, length - nread);
+      if (nbytes < 0) {
+        return nread > 0 ? nread : nbytes;
+      }
+      nread += nbytes;
+    }
+    return nread;
+  }
+
+  /**
+   * Similar to readFully(). Skips bytes in a loop.
+   * @param in The DataInput to skip bytes from
+   * @param len number of bytes to skip.
+   * @throws java.io.IOException if it could not skip requested number of bytes
+   * for any reason (including EOF)
+   */
+  public static void skipFully(DataInput in, int len) throws IOException {
+    int amt = len;
+    while (amt > 0) {
+      long ret = in.skipBytes(amt);
+      if (ret == 0) {
+        // skip may return 0 even if we're not at EOF.  Luckily, we can
+        // use the read() method to figure out if we're at the end.
+        int b = in.readByte();
+        if (b == -1) {
+          throw new EOFException( "Premature EOF from inputStream after " +
+              "skipping " + (len - amt) + " byte(s).");
+        }
+        ret = 1;
+      }
+      amt -= ret;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
new file mode 100644
index 0000000..a2c08de
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+
+/**
+ * This class is not thread-safe.
+ */
+public class TableStatistics {
+  private static final Log LOG = LogFactory.getLog(TableStatistics.class);
+  private Schema schema;
+  private Tuple minValues;
+  private Tuple maxValues;
+  private long [] numNulls;
+  private long numRows = 0;
+  private long numBytes = 0;
+
+  private boolean [] comparable;
+
+  public TableStatistics(Schema schema) {
+    this.schema = schema;
+    minValues = new VTuple(schema.size());
+    maxValues = new VTuple(schema.size());
+
+    numNulls = new long[schema.size()];
+    comparable = new boolean[schema.size()];
+
+    DataType type;
+    for (int i = 0; i < schema.size(); i++) {
+      type = schema.getColumn(i).getDataType();
+      if (type.getType() == Type.PROTOBUF) {
+        comparable[i] = false;
+      } else {
+        comparable[i] = true;
+      }
+    }
+  }
+
+  public Schema getSchema() {
+    return this.schema;
+  }
+
+  public void incrementRow() {
+    numRows++;
+  }
+
+  public long getNumRows() {
+    return this.numRows;
+  }
+
+  public void setNumBytes(long bytes) {
+    this.numBytes = bytes;
+  }
+
+  public long getNumBytes() {
+    return this.numBytes;
+  }
+
+  public void analyzeField(int idx, Datum datum) {
+    if (datum instanceof NullDatum) {
+      numNulls[idx]++;
+      return;
+    }
+
+    if (comparable[idx]) {
+      if (!maxValues.contains(idx) ||
+          maxValues.get(idx).compareTo(datum) < 0) {
+        maxValues.put(idx, datum);
+      }
+      if (!minValues.contains(idx) ||
+          minValues.get(idx).compareTo(datum) > 0) {
+        minValues.put(idx, datum);
+      }
+    }
+  }
+
+  public TableStats getTableStat() {
+    TableStats stat = new TableStats();
+
+    ColumnStats columnStats;
+    for (int i = 0; i < schema.size(); i++) {
+      columnStats = new ColumnStats(schema.getColumn(i));
+      columnStats.setNumNulls(numNulls[i]);
+      if (minValues.get(i) == null || 
schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) {
+        columnStats.setMinValue(minValues.get(i));
+      } else {
+        LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
+            ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
+      }
+      if (maxValues.get(i) == null || 
schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) {
+        columnStats.setMaxValue(maxValues.get(i));
+      } else {
+        LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() +
+            ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
+      }
+      stat.addColumnStat(columnStats);
+    }
+
+    stat.setNumRows(this.numRows);
+    stat.setNumBytes(this.numBytes);
+
+    return stat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
new file mode 100644
index 0000000..ab8816b
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.TimeZone;
+
+// Compatibility with Apache Hive
+@Deprecated
+public class TextSerializerDeserializer implements SerializerDeserializer {
+  public static final byte[] trueBytes = "true".getBytes();
+  public static final byte[] falseBytes = "false".getBytes();
+  private ProtobufJsonFormat protobufJsonFormat = 
ProtobufJsonFormat.getInstance();
+
+  @Override
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] 
nullCharacters) throws IOException {
+
+    byte[] bytes;
+    int length = 0;
+    TajoDataTypes.DataType dataType = col.getDataType();
+
+    if (datum == null || datum instanceof NullDatum) {
+      switch (dataType.getType()) {
+        case CHAR:
+        case TEXT:
+          length = nullCharacters.length;
+          out.write(nullCharacters);
+          break;
+        default:
+          break;
+      }
+      return length;
+    }
+
+    switch (dataType.getType()) {
+      case BOOLEAN:
+        out.write(datum.asBool() ? trueBytes : falseBytes);
+        length = trueBytes.length;
+        break;
+      case CHAR:
+        byte[] pad = new byte[dataType.getLength() - datum.size()];
+        bytes = datum.asTextBytes();
+        out.write(bytes);
+        out.write(pad);
+        length = bytes.length + pad.length;
+        break;
+      case TEXT:
+      case BIT:
+      case INT2:
+      case INT4:
+      case INT8:
+      case FLOAT4:
+      case FLOAT8:
+      case INET4:
+      case DATE:
+      case INTERVAL:
+        bytes = datum.asTextBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case TIME:
+        bytes = ((TimeDatum)datum).asChars(TimeZone.getDefault(), 
true).getBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case TIMESTAMP:
+        bytes = ((TimestampDatum)datum).asChars(TimeZone.getDefault(), 
true).getBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case INET6:
+      case BLOB:
+        bytes = Base64.encodeBase64(datum.asByteArray(), false);
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case PROTOBUF:
+        ProtobufDatum protobuf = (ProtobufDatum) datum;
+        byte[] protoBytes = 
protobufJsonFormat.printToString(protobuf.get()).getBytes();
+        length = protoBytes.length;
+        out.write(protoBytes, 0, protoBytes.length);
+        break;
+      case NULL_TYPE:
+      default:
+        break;
+    }
+    return length;
+  }
+
+  @Override
+  public Datum deserialize(Column col, byte[] bytes, int offset, int length, 
byte[] nullCharacters) throws IOException {
+
+    Datum datum;
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 
'T');
+        break;
+      case BIT:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, 
length)));
+        break;
+      case CHAR:
+        datum = isNullText(bytes, offset, length, nullCharacters) ? 
NullDatum.get()
+            : DatumFactory.createChar(new String(bytes, offset, 
length).trim());
+        break;
+      case INT1:
+      case INT2:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, 
offset, length));
+        break;
+      case INT4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt4(NumberUtil.parseInt(bytes, offset, 
length));
+        break;
+      case INT8:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt8(new String(bytes, offset, length));
+        break;
+      case FLOAT4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createFloat4(new String(bytes, offset, length));
+        break;
+      case FLOAT8:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, offset, 
length));
+        break;
+      case TEXT: {
+        byte[] chars = new byte[length];
+        System.arraycopy(bytes, offset, chars, 0, length);
+        datum = isNullText(bytes, offset, length, nullCharacters) ? 
NullDatum.get()
+            : DatumFactory.createText(chars);
+        break;
+      }
+      case DATE:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createDate(new String(bytes, offset, length));
+        break;
+      case TIME:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createTime(new String(bytes, offset, length));
+        break;
+      case TIMESTAMP:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createTimestamp(new String(bytes, offset, length));
+        break;
+      case INTERVAL:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInterval(new String(bytes, offset, length));
+        break;
+      case PROTOBUF: {
+        if (isNull(bytes, offset, length, nullCharacters)) {
+          datum = NullDatum.get();
+        } else {
+          ProtobufDatumFactory factory = 
ProtobufDatumFactory.get(col.getDataType());
+          Message.Builder builder = factory.newBuilder();
+          try {
+            byte[] protoBytes = new byte[length];
+            System.arraycopy(bytes, offset, protoBytes, 0, length);
+            protobufJsonFormat.merge(protoBytes, builder);
+            datum = factory.createDatum(builder.build());
+          } catch (IOException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+        break;
+      }
+      case INET4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInet4(new String(bytes, offset, length));
+        break;
+      case BLOB: {
+        if (isNull(bytes, offset, length, nullCharacters)) {
+          datum = NullDatum.get();
+        } else {
+          byte[] blob = new byte[length];
+          System.arraycopy(bytes, offset, blob, 0, length);
+          datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
+        }
+        break;
+      }
+      default:
+        datum = NullDatum.get();
+        break;
+    }
+    return datum;
+  }
+
+  private static boolean isNull(byte[] val, int offset, int length, byte[] 
nullBytes) {
+    return length == 0 || ((length == nullBytes.length)
+        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
+  }
+
+  private static boolean isNullText(byte[] val, int offset, int length, byte[] 
nullBytes) {
+    return length > 0 && length == nullBytes.length
+        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java
new file mode 100644
index 0000000..8dffd8d
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -0,0 +1,32 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.Comparator;
+
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+public abstract class TupleComparator implements Comparator<Tuple>, 
ProtoObject<TupleComparatorProto> {
+
+  public abstract int compare(Tuple o1, Tuple o2);
+
+  public abstract boolean isAscendingFirstKey();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
new file mode 100644
index 0000000..e824b99
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+
+import java.util.Comparator;
+
+/**
+ * It represents a pair of start and end tuples.
+ */
+public class TupleRange implements Comparable<TupleRange>, Cloneable {
+  private Tuple start;
+  private Tuple end;
+  private final TupleComparator comp;
+
+  public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple 
end) {
+    this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), 
sortSpecs);
+    // if there is only one value, start == end
+    this.start = start;
+    this.end = end;
+  }
+
+  public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
+    Schema schema = new Schema();
+    for (SortSpec spec : sortSpecs) {
+      schema.addColumn(spec.getSortKey());
+    }
+
+    return schema;
+  }
+
+  public void setStart(Tuple tuple) {
+    this.start = tuple;
+  }
+
+  public final Tuple getStart() {
+    return this.start;
+  }
+
+  public void setEnd(Tuple tuple) {
+    this.end = tuple;
+  }
+
+  public final Tuple getEnd() {
+    return this.end;
+  }
+
+  public String toString() {
+    return "[" + this.start + ", " + this.end + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(start, end);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof TupleRange) {
+      TupleRange other = (TupleRange) obj;
+      return this.start.equals(other.start) && this.end.equals(other.end);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int compareTo(TupleRange o) {
+    // TODO - should handle overlap
+    int cmpVal = comp.compare(this.start, o.start);
+    if (cmpVal != 0) {
+      return cmpVal;
+    } else {
+      return comp.compare(this.end, o.end);
+    }
+  }
+
+  public static class DescendingTupleRangeComparator
+      implements Comparator<TupleRange> {
+
+    @Override
+    public int compare(TupleRange left, TupleRange right) {
+      return right.compareTo(left);
+    }
+  }
+
+  public TupleRange clone() throws CloneNotSupportedException {
+    TupleRange newRange = (TupleRange) super.clone();
+    newRange.setStart(start.clone());
+    newRange.setEnd(end.clone());
+    return newRange;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
new file mode 100644
index 0000000..ad19101
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface ForSplitableStore {
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
new file mode 100644
index 0000000..baeda8c
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.compress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DoNotPool;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse (possibly
+ * native) compression/decompression codecs.
+ */
+public final class CodecPool {
+  private static final Log LOG = LogFactory.getLog(CodecPool.class);
+
+  /**
+   * A global compressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Compressor>, List<Compressor>> 
COMPRESSOR_POOL =
+      new HashMap<Class<Compressor>, List<Compressor>>();
+
+  /**
+   * A global decompressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Decompressor>, List<Decompressor>> 
DECOMPRESSOR_POOL =
+      new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+  private static <T> T borrow(Map<Class<T>, List<T>> pool,
+      Class<? extends T> codecClass) {
+    T codec = null;
+
+    // Check if an appropriate codec is available
+    synchronized (pool) {
+      if (pool.containsKey(codecClass)) {
+        List<T> codecList = pool.get(codecClass);
+
+        if (codecList != null) {
+          synchronized (codecList) {
+            if (!codecList.isEmpty()) {
+              codec = codecList.remove(codecList.size() - 1);
+            }
+          }
+        }
+      }
+    }
+
+    return codec;
+  }
+
+  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+    if (codec != null) {
+      Class<T> codecClass = (Class<T>) codec.getClass();
+      synchronized (pool) {
+        if (!pool.containsKey(codecClass)) {
+          pool.put(codecClass, new ArrayList<T>());
+        }
+
+        List<T> codecList = pool.get(codecClass);
+        synchronized (codecList) {
+          codecList.add(codec);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get a {@link Compressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *
+   * @param codec
+   *          the <code>CompressionCodec</code> for which to get the
+   *          <code>Compressor</code>
+   * @param conf the <code>Configuration</code> object which contains confs 
for creating or reinit the compressor
+   * @return <code>Compressor</code> for the given 
<code>CompressionCodec</code>
+   *         from the pool or a new one
+   */
+  public static Compressor getCompressor(CompressionCodec codec, Configuration 
conf) {
+    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
+    if (compressor == null) {
+      compressor = codec.createCompressor();
+      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
+    } else {
+      compressor.reinit(conf);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Got recycled compressor");
+      }
+    }
+    return compressor;
+  }
+
+  public static Compressor getCompressor(CompressionCodec codec) {
+    return getCompressor(codec, null);
+  }
+
+  /**
+   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *
+   * @param codec
+   *          the <code>CompressionCodec</code> for which to get the
+   *          <code>Decompressor</code>
+   * @return <code>Decompressor</code> for the given
+   *         <code>CompressionCodec</code> the pool or a new one
+   */
+  public static Decompressor getDecompressor(CompressionCodec codec) {
+    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
+        .getDecompressorType());
+    if (decompressor == null) {
+      decompressor = codec.createDecompressor();
+      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
+    } else {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Got recycled decompressor");
+      }
+    }
+    return decompressor;
+  }
+
+  /**
+   * Return the {@link Compressor} to the pool.
+   *
+   * @param compressor
+   *          the <code>Compressor</code> to be returned to the pool
+   */
+  public static void returnCompressor(Compressor compressor) {
+    if (compressor == null) {
+      return;
+    }
+    // if the compressor can't be reused, don't pool it.
+    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      return;
+    }
+    compressor.reset();
+    payback(COMPRESSOR_POOL, compressor);
+  }
+
+  /**
+   * Return the {@link Decompressor} to the pool.
+   *
+   * @param decompressor
+   *          the <code>Decompressor</code> to be returned to the pool
+   */
+  public static void returnDecompressor(Decompressor decompressor) {
+    if (decompressor == null) {
+      return;
+    }
+    // if the decompressor can't be reused, don't pool it.
+    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      return;
+    }
+    decompressor.reset();
+    payback(DECOMPRESSOR_POOL, decompressor);
+  }
+
+  private CodecPool() {
+    // prevent instantiation
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
new file mode 100644
index 0000000..bb035a8
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage.exception;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public class AlreadyExistsStorageException extends IOException {
+  private static final long serialVersionUID = 965518916144019032L;
+
+
+  public AlreadyExistsStorageException(String path) {
+    super("Error: "+path+" alreay exists");    
+  }
+  
+  public AlreadyExistsStorageException(Path path) {
+    this(path.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
new file mode 100644
index 0000000..a67d1f7
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.exception;
+
+public class UnknownCodecException extends Exception {
+
+  private static final long serialVersionUID = 4287230843540404529L;
+
+  public UnknownCodecException() {
+
+  }
+
+  public UnknownCodecException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
new file mode 100644
index 0000000..d18b5a0
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.exception;
+
+public class UnknownDataTypeException extends Exception {
+
+  private static final long serialVersionUID = -2630390595968966164L;
+
+  public UnknownDataTypeException() {
+
+  }
+
+  public UnknownDataTypeException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
new file mode 100644
index 0000000..8b197d6
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.storage.exception;
+
+public class UnsupportedFileTypeException extends RuntimeException {
+       private static final long serialVersionUID = -8160289695849000342L;
+
+       public UnsupportedFileTypeException() {
+       }
+
+       /**
+        * @param message
+        */
+       public UnsupportedFileTypeException(String message) {
+               super(message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
new file mode 100644
index 0000000..ac43197
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import org.apache.tajo.common.ProtoObject;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public interface Fragment extends ProtoObject<FragmentProto> {
+
+  public abstract String getTableName();
+
+  @Override
+  public abstract FragmentProto getProto();
+
+  public abstract long getLength();
+
+  public abstract String getKey();
+
+  public String[] getHosts();
+
+  public abstract boolean isEmpty();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
new file mode 100644
index 0000000..07720c7
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.annotation.ThreadSafe;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+@ThreadSafe
+public class FragmentConvertor {
+  /**
+   * Cache of fragment classes
+   */
+  protected static final Map<String, Class<? extends Fragment>> 
CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
+  /**
+   * Cache of constructors for each class.
+   */
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 
Maps.newConcurrentMap();
+  /**
+   * default parameter for all constructors
+   */
+  private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class 
};
+
+  public static Class<? extends Fragment> getFragmentClass(Configuration conf, 
String storeType)
+  throws IOException {
+    Class<? extends Fragment> fragmentClass = 
CACHED_FRAGMENT_CLASSES.get(storeType.toLowerCase());
+    if (fragmentClass == null) {
+      fragmentClass = conf.getClass(
+          String.format("tajo.storage.fragment.%s.class", 
storeType.toLowerCase()), null, Fragment.class);
+      CACHED_FRAGMENT_CLASSES.put(storeType.toLowerCase(), fragmentClass);
+    }
+
+    if (fragmentClass == null) {
+      throw new IOException("No such a fragment for " + 
storeType.toLowerCase());
+    }
+
+    return fragmentClass;
+  }
+
+  public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto 
fragment) {
+    T result;
+    try {
+      Constructor<T> constructor = (Constructor<T>) 
CONSTRUCTOR_CACHE.get(clazz);
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
+        constructor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(clazz, constructor);
+      }
+      result = constructor.newInstance(new Object[]{fragment.getContents()});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  public static <T extends Fragment> T convert(Configuration conf, 
FragmentProto fragment)
+      throws IOException {
+    Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, 
fragment.getStoreType().toLowerCase());
+    if (fragmentClass == null) {
+      throw new IOException("No such a fragment class for " + 
fragment.getStoreType());
+    }
+    return convert(fragmentClass, fragment);
+  }
+
+  public static <T extends Fragment> List<T> convert(Class<T> clazz, 
FragmentProto...fragments)
+      throws IOException {
+    List<T> list = Lists.newArrayList();
+    if (fragments == null) {
+      return list;
+    }
+    for (FragmentProto proto : fragments) {
+      list.add(convert(clazz, proto));
+    }
+    return list;
+  }
+
+  public static <T extends Fragment> List<T> convert(Configuration conf, 
FragmentProto...fragments) throws IOException {
+    List<T> list = Lists.newArrayList();
+    if (fragments == null) {
+      return list;
+    }
+    for (FragmentProto proto : fragments) {
+      list.add((T) convert(conf, proto));
+    }
+    return list;
+  }
+
+  public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) 
{
+    List<FragmentProto> list = Lists.newArrayList();
+    if (fragments == null) {
+      return list;
+    }
+    for (Fragment fragment : fragments) {
+      list.add(fragment.getProto());
+    }
+    return list;
+  }
+
+  public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
+    List<FragmentProto> list = toFragmentProtoList(fragments);
+    return list.toArray(new FragmentProto[list.size()]);
+  }
+}

Reply via email to