http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 23c2406..e37be58 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -18,43 +18,65 @@
 
 package org.apache.tajo.storage;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.RewriteRule;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.storage.hbase.HBaseStorageManager;
+import org.apache.tajo.util.TUtil;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.URI;
+import java.text.NumberFormat;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * StorageManager
+ * StorageManager manages the functions of storing and reading data.
+ * StorageManager is a abstract class.
+ * For supporting such as HDFS, HBASE, a specific StorageManager should be 
implemented by inheriting this class.
+ *
  */
 public abstract class StorageManager {
   private final Log LOG = LogFactory.getLog(StorageManager.class);
 
+  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+      Configuration.class,
+      Schema.class,
+      TableMeta.class,
+      Fragment.class
+  };
+
+  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+      Configuration.class,
+      QueryUnitAttemptId.class,
+      Schema.class,
+      TableMeta.class,
+      Path.class
+  };
+
   protected TajoConf conf;
+  protected StoreType storeType;
 
+  /**
+   * Cache of StorageManager.
+   * Key is manager key(warehouse path) + store type
+   */
   private static final Map<String, StorageManager> storageManagers = 
Maps.newHashMap();
 
   /**
@@ -66,8 +88,8 @@ public abstract class StorageManager {
   /**
    * Cache of appender handlers for each storage type.
    */
-  protected static final Map<String, Class<? extends FileAppender>> 
APPENDER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
+  protected static final Map<String, Class<? extends Appender>> 
APPENDER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Appender>>();
 
   /**
    * Cache of constructors for each class. Pins the classes so they
@@ -76,20 +98,169 @@ public abstract class StorageManager {
   private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
       new ConcurrentHashMap<Class<?>, Constructor<?>>();
 
-  protected abstract void storageInit() throws IOException ;
-  public abstract void createTable(TableDesc tableDesc) throws IOException;
+  public StorageManager(StoreType storeType) {
+    this.storeType = storeType;
+  }
+
+  /**
+   * Initialize storage manager.
+   * @throws IOException
+   */
+  protected abstract void storageInit() throws IOException;
+
+  /**
+   * This method is called after executing "CREATE TABLE" statement.
+   * If a storage is a file based storage, a storage manager may create 
directory.
+   *
+   * @param tableDesc Table description which is created.
+   * @param ifNotExists Creates the table only when the table does not exist.
+   * @throws IOException
+   */
+  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) 
throws IOException;
+
+  /**
+   * This method is called after executing "DROP TABLE" statement with the 
'PURGE' option
+   * which is the option to delete all the data.
+   *
+   * @param tableDesc
+   * @throws IOException
+   */
   public abstract void purgeTable(TableDesc tableDesc) throws IOException;
-  public abstract List<Fragment> getSplits(String fragmentId, TableDesc 
tableDesc) throws IOException;
 
+  /**
+   * Returns the splits that will serve as input for the scan tasks. The
+   * number of splits matches the number of regions in a table.
+   * @param fragmentId The table name or previous ExecutionBlockId
+   * @param tableDesc The table description for the target data.
+   * @param scanNode The logical node for scanning.
+   * @return The list of input fragments.
+   * @throws IOException
+   */
+  public abstract List<Fragment> getSplits(String fragmentId, TableDesc 
tableDesc,
+                                           ScanNode scanNode) throws 
IOException;
+
+  /**
+   * It returns the splits that will serve as input for the non-forward query 
scanner such as 'select * from table1'.
+   * The result list should be small. If there is many fragments for scanning, 
TajoMaster uses the paging navigation.
+   * @param tableDesc The table description for the target data.
+   * @param currentPage The current page number within the entire list.
+   * @param numFragments The number of fragments in the result.
+   * @return The list of input fragments.
+   * @throws IOException
+   */
+  public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int 
currentPage, int numFragments)
+      throws IOException;
+
+  /**
+   * It returns the storage property.
+   * @return The storage property
+   */
+  public abstract StorageProperty getStorageProperty();
+
+  /**
+   * Release storage manager resource
+   */
+  public abstract void closeStorageManager();
+
+  /**
+   * It is called by a Repartitioner for range shuffling when the 
SortRangeType of SortNode is STORAGE_SPECIFIED.
+   * In general Repartitioner determines the partition range using previous 
output statistics data.
+   * In the special cases, such as HBase Repartitioner uses the result of this 
method.
+   *
+   * @param queryContext The current query context which contains query 
properties.
+   * @param tableDesc The table description for the target data.
+   * @param inputSchema The input schema
+   * @param sortSpecs The sort specification that contains the sort column and 
sort order.
+   * @return The list of sort ranges.
+   * @throws IOException
+   */
+  public abstract TupleRange[] getInsertSortRanges(OverridableConf 
queryContext, TableDesc tableDesc,
+                                                   Schema inputSchema, 
SortSpec[] sortSpecs,
+                                                   TupleRange dataRange) 
throws IOException;
+
+  /**
+   * This method is called before executing 'INSERT' or 'CREATE TABLE as 
SELECT'.
+   * In general Tajo creates the target table after finishing the final 
sub-query of CATS.
+   * But In the special cases, such as HBase INSERT or CAST query uses the 
target table information.
+   * That kind of the storage should implements the logic related to creating 
table in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws IOException
+   */
+  public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
+
+  /**
+   * It is called when the query failed.
+   * Each storage manager should implement to be processed when the query 
fails in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws IOException
+   */
+  public abstract void rollbackOutputCommit(LogicalNode node) throws 
IOException;
+
+  /**
+   * Returns the current storage type.
+   * @return
+   */
+  public StoreType getStoreType() {
+    return storeType;
+  }
+
+  /**
+   * Initialize StorageManager instance. It should be called before using.
+   *
+   * @param tajoConf
+   * @throws IOException
+   */
   public void init(TajoConf tajoConf) throws IOException {
     this.conf = tajoConf;
     storageInit();
   }
 
+  /**
+   * Close StorageManager
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    synchronized(storageManagers) {
+      for (StorageManager eachStorageManager: storageManagers.values()) {
+        eachStorageManager.closeStorageManager();
+      }
+    }
+  }
+
+  /**
+   * Returns the splits that will serve as input for the scan tasks. The
+   * number of splits matches the number of regions in a table.
+   *
+   * @param fragmentId The table name or previous ExecutionBlockId
+   * @param tableDesc The table description for the target data.
+   * @return The list of input fragments.
+   * @throws IOException
+   */
+  public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) 
throws IOException {
+    return getSplits(fragmentId, tableDesc, null);
+  }
+
+  /**
+   * Returns FileStorageManager instance.
+   *
+   * @param tajoConf Tajo system property.
+   * @return
+   * @throws IOException
+   */
   public static FileStorageManager getFileStorageManager(TajoConf tajoConf) 
throws IOException {
     return getFileStorageManager(tajoConf, null);
   }
 
+  /**
+   * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in 
tajoConf with warehousePath parameter.
+   *
+   * @param tajoConf Tajo system property.
+   * @param warehousePath The warehouse directory to be set in the tajoConf.
+   * @return
+   * @throws IOException
+   */
   public static FileStorageManager getFileStorageManager(TajoConf tajoConf, 
Path warehousePath) throws IOException {
     URI uri;
     TajoConf copiedConf = new TajoConf(tajoConf);
@@ -101,22 +272,58 @@ public abstract class StorageManager {
     return (FileStorageManager) getStorageManager(copiedConf, StoreType.CSV, 
key);
   }
 
+  /**
+   * Returns the proper StorageManager instance according to the storeType.
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @return
+   * @throws IOException
+   */
+  public static StorageManager getStorageManager(TajoConf tajoConf, String 
storeType) throws IOException {
+    if ("HBASE".equals(storeType)) {
+      return getStorageManager(tajoConf, StoreType.HBASE);
+    } else {
+      return getStorageManager(tajoConf, StoreType.CSV);
+    }
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType.
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @return
+   * @throws IOException
+   */
   public static StorageManager getStorageManager(TajoConf tajoConf, StoreType 
storeType) throws IOException {
     return getStorageManager(tajoConf, storeType, null);
   }
 
+  /**
+   * Returns the proper StorageManager instance according to the storeType
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @param managerKey Key that can identify each storage manager(may be a 
path)
+   * @return
+   * @throws IOException
+   */
   public static synchronized StorageManager getStorageManager (
-      TajoConf conf, StoreType storeType, String managerKey) throws 
IOException {
+      TajoConf tajoConf, StoreType storeType, String managerKey) throws 
IOException {
     synchronized (storageManagers) {
       String storeKey = storeType + managerKey;
       StorageManager manager = storageManagers.get(storeKey);
       if (manager == null) {
         switch (storeType) {
+          case HBASE:
+            manager = new HBaseStorageManager(storeType);
+            break;
           default:
-            manager = new FileStorageManager();
+            manager = new FileStorageManager(storeType);
         }
 
-        manager.init(conf);
+        manager.init(tajoConf);
         storageManagers.put(storeKey, manager);
       }
 
@@ -124,27 +331,121 @@ public abstract class StorageManager {
     }
   }
 
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target Columns which are selected.
+   * @return Scanner instance
+   * @throws IOException
+   */
   public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto 
fragment, Schema target) throws IOException {
-    return getScanner(meta, schema, FragmentConvertor.convert(conf, 
meta.getStoreType(), fragment), target);
+    return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), 
target);
   }
 
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @return Scanner instance
+   * @throws IOException
+   */
   public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) 
throws IOException {
     return getScanner(meta, schema, fragment, schema);
   }
 
-  public Appender getAppender(TableMeta meta, Schema schema, Path path)
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, 
Schema target) throws IOException {
+    if (fragment.isEmpty()) {
+      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
+      scanner.setTarget(target.toArray());
+
+      return scanner;
+    }
+
+    Scanner scanner;
+
+    Class<? extends Scanner> scannerClass = 
getScannerClass(meta.getStoreType());
+    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+    if (scanner.isProjectable()) {
+      scanner.setTarget(target.toArray());
+    }
+
+    return scanner;
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param conf The system property
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws IOException
+   */
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, 
Schema target) throws IOException {
+    return (SeekableScanner)getStorageManager(conf, 
meta.getStoreType()).getScanner(meta, schema, fragment, target);
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param conf The system property
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param path The data file path
+   * @return Scanner instance
+   * @throws IOException
+   */
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, Path path) throws 
IOException {
+
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+    FileFragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
+
+    return getSeekableScanner(conf, meta, schema, fragment, schema);
+  }
+
+  /**
+   * Returns Appender instance.
+   * @param queryContext Query property.
+   * @param taskAttemptId Task id.
+   * @param meta Table meta data.
+   * @param schema Output schema.
+   * @param workDir Working directory
+   * @return Appender instance
+   * @throws IOException
+   */
+  public Appender getAppender(OverridableConf queryContext,
+                              QueryUnitAttemptId taskAttemptId, TableMeta 
meta, Schema schema, Path workDir)
       throws IOException {
     Appender appender;
 
-    Class<? extends FileAppender> appenderClass;
+    Class<? extends Appender> appenderClass;
 
     String handlerName = meta.getStoreType().name().toLowerCase();
     appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
     if (appenderClass == null) {
       appenderClass = conf.getClass(
           String.format("tajo.storage.appender-handler.%s.class",
-              meta.getStoreType().name().toLowerCase()), null,
-          FileAppender.class);
+              meta.getStoreType().name().toLowerCase()), null, Appender.class);
       APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
     }
 
@@ -152,27 +453,21 @@ public abstract class StorageManager {
       throw new IOException("Unknown Storage Type: " + meta.getStoreType());
     }
 
-    appender = newAppenderInstance(appenderClass, conf, meta, schema, path);
+    appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, 
schema, workDir);
 
     return appender;
   }
 
-  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
-      Configuration.class,
-      Schema.class,
-      TableMeta.class,
-      Fragment.class
-  };
-
-  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
-      Configuration.class,
-      Schema.class,
-      TableMeta.class,
-      Path.class
-  };
-
   /**
-   * create a scanner instance.
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param schema Input schema
+   * @param meta Table meta data
+   * @param fragment The fragment for scanning
+   * @param <T>
+   * @return The scanner instance
    */
   public static <T> T newScannerInstance(Class<T> theClass, Configuration 
conf, Schema schema, TableMeta meta,
                                          Fragment fragment) {
@@ -193,10 +488,19 @@ public abstract class StorageManager {
   }
 
   /**
-   * create a scanner instance.
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param taskAttemptId Task id
+   * @param meta Table meta data
+   * @param schema Input schema
+   * @param workDir Working directory
+   * @param <T>
+   * @return The scanner instance
    */
-  public static <T> T newAppenderInstance(Class<T> theClass, Configuration 
conf, TableMeta meta, Schema schema,
-                                          Path path) {
+  public static <T> T newAppenderInstance(Class<T> theClass, Configuration 
conf, QueryUnitAttemptId taskAttemptId,
+                                          TableMeta meta, Schema schema, Path 
workDir) {
     T result;
     try {
       Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
@@ -205,7 +509,7 @@ public abstract class StorageManager {
         meth.setAccessible(true);
         CONSTRUCTOR_CACHE.put(theClass, meth);
       }
-      result = meth.newInstance(new Object[]{conf, schema, meta, path});
+      result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, 
meta, workDir});
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -213,6 +517,13 @@ public abstract class StorageManager {
     return result;
   }
 
+  /**
+   * Return the Scanner class for the StoreType that is defined in 
storage-default.xml.
+   *
+   * @param storeType store type
+   * @return The Scanner class
+   * @throws IOException
+   */
   public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType 
storeType) throws IOException {
     String handlerName = storeType.name().toLowerCase();
     Class<? extends Scanner> scannerClass = 
SCANNER_HANDLER_CACHE.get(handlerName);
@@ -229,37 +540,387 @@ public abstract class StorageManager {
     return scannerClass;
   }
 
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, 
Schema target) throws IOException {
-    if (fragment.isEmpty()) {
-      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
-      scanner.setTarget(target.toArray());
+  /**
+   * Return length of the fragment.
+   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from 
the configuration.
+   *
+   * @param conf Tajo system property
+   * @param fragment Fragment
+   * @return
+   */
+  public static long getFragmentLength(TajoConf conf, Fragment fragment) {
+    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+      return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+    } else {
+      return fragment.getLength();
+    }
+  }
 
-      return scanner;
+  /**
+   * It is called after making logical plan. Storage manager should verify the 
schema for inserting.
+   *
+   * @param tableDesc The table description of insert target.
+   * @param outSchema  The output schema of select query for inserting.
+   * @throws IOException
+   */
+  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) 
throws IOException {
+    // nothing to do
+  }
+
+  /**
+   * Returns the list of storage specified rewrite rules.
+   * This values are used by LogicalOptimizer.
+   *
+   * @param queryContext The query property
+   * @param tableDesc The description of the target table.
+   * @return The list of storage specified rewrite rules
+   * @throws IOException
+   */
+  public List<RewriteRule> getRewriteRules(OverridableConf queryContext, 
TableDesc tableDesc) throws IOException {
+    return null;
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the 
staging directory.
+   *
+   * @param queryContext The query property
+   * @param finalEbId The final execution block id
+   * @param plan The query plan
+   * @param schema The final output schema
+   * @param tableDesc The description of the target table
+   * @return Saved path
+   * @throws IOException
+   */
+  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId 
finalEbId,
+                               LogicalPlan plan, Schema schema,
+                               TableDesc tableDesc) throws IOException {
+    return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, 
true);
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the 
staging directory.
+   *
+   * @param queryContext The query property
+   * @param finalEbId The final execution block id
+   * @param plan The query plan
+   * @param schema The final output schema
+   * @param tableDesc The description of the target table
+   * @param changeFileSeq If true change result file name with max sequence.
+   * @return Saved path
+   * @throws IOException
+   */
+  protected Path commitOutputData(OverridableConf queryContext, 
ExecutionBlockId finalEbId,
+                               LogicalPlan plan, Schema schema,
+                               TableDesc tableDesc, boolean changeFileSeq) 
throws IOException {
+    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
+    Path stagingResultDir = new Path(stagingDir, 
TajoConstants.RESULT_DIR_NAME);
+    Path finalOutputDir;
+    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
+      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
+      FileSystem fs = stagingResultDir.getFileSystem(conf);
+
+      if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT 
OVERWRITE INTO
+
+        // It moves the original table into the temporary location.
+        // Then it moves the new result table into the original table location.
+        // Upon failed, it recovers the original table if possible.
+        boolean movedToOldTable = false;
+        boolean committed = false;
+        Path oldTableDir = new Path(stagingDir, 
TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+
+        if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+          // This is a map for existing non-leaf directory to rename. A key is 
current directory and a value is
+          // renaming directory.
+          Map<Path, Path> renameDirs = TUtil.newHashMap();
+          // This is a map for recovering existing partition directory. A key 
is current directory and a value is
+          // temporary directory to back up.
+          Map<Path, Path> recoveryDirs = TUtil.newHashMap();
+
+          try {
+            if (!fs.exists(finalOutputDir)) {
+              fs.mkdirs(finalOutputDir);
+            }
+
+            visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, 
stagingResultDir.toString(),
+                renameDirs, oldTableDir);
+
+            // Rename target partition directories
+            for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+              // Backup existing data files for recovering
+              if (fs.exists(entry.getValue())) {
+                String recoveryPathString = 
entry.getValue().toString().replaceAll(finalOutputDir.toString(),
+                    oldTableDir.toString());
+                Path recoveryPath = new Path(recoveryPathString);
+                fs.rename(entry.getValue(), recoveryPath);
+                fs.exists(recoveryPath);
+                recoveryDirs.put(entry.getValue(), recoveryPath);
+              }
+              // Delete existing directory
+              fs.delete(entry.getValue(), true);
+              // Rename staging directory to final output directory
+              fs.rename(entry.getKey(), entry.getValue());
+            }
+
+          } catch (IOException ioe) {
+            // Remove created dirs
+            for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+              fs.delete(entry.getValue(), true);
+            }
+
+            // Recovery renamed dirs
+            for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
+              fs.delete(entry.getValue(), true);
+              fs.rename(entry.getValue(), entry.getKey());
+            }
+            throw new IOException(ioe.getMessage());
+          }
+        } else {
+          try {
+            if (fs.exists(finalOutputDir)) {
+              fs.rename(finalOutputDir, oldTableDir);
+              movedToOldTable = fs.exists(oldTableDir);
+            } else { // if the parent does not exist, make its parent 
directory.
+              fs.mkdirs(finalOutputDir.getParent());
+            }
+
+            fs.rename(stagingResultDir, finalOutputDir);
+            committed = fs.exists(finalOutputDir);
+          } catch (IOException ioe) {
+            // recover the old table
+            if (movedToOldTable && !committed) {
+              fs.rename(oldTableDir, finalOutputDir);
+            }
+          }
+        }
+      } else {
+        String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
+
+        if (queryType != null && queryType.equals(NodeType.INSERT.name())) { 
// INSERT INTO an existing table
+
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(3);
+
+          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+            for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+              if (eachFile.isFile()) {
+                LOG.warn("Partition table can't have file in a staging dir: " 
+ eachFile.getPath());
+                continue;
+              }
+              moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, -1, changeFileSeq);
+            }
+          } else {
+            int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, 
false) + 1;
+            for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+              if (eachFile.getPath().getName().startsWith("_")) {
+                continue;
+              }
+              moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, maxSeq++, changeFileSeq);
+            }
+          }
+          // checking all file moved and remove empty dir
+          verifyAllFileMoved(fs, stagingResultDir);
+          FileStatus[] files = fs.listStatus(stagingResultDir);
+          if (files != null && files.length != 0) {
+            for (FileStatus eachFile: files) {
+              LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
+            }
+          }
+        } else { // CREATE TABLE AS SELECT (CTAS)
+          fs.rename(stagingResultDir, finalOutputDir);
+          LOG.info("Moved from the staging dir to the output directory '" + 
finalOutputDir);
+        }
+      }
+    } else {
+      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
     }
 
-    Scanner scanner;
+    return finalOutputDir;
+  }
 
-    Class<? extends Scanner> scannerClass = 
getScannerClass(meta.getStoreType());
-    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
-    if (scanner.isProjectable()) {
-      scanner.setTarget(target.toArray());
+  /**
+   * Attach the sequence number to the output file name and than move the file 
into the final result path.
+   *
+   * @param fs FileSystem
+   * @param stagingResultDir The staging result dir
+   * @param fileStatus The file status
+   * @param finalOutputPath Final output path
+   * @param nf Number format
+   * @param fileSeq The sequence number
+   * @throws IOException
+   */
+  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
+                                          FileStatus fileStatus, Path 
finalOutputPath,
+                                          NumberFormat nf,
+                                          int fileSeq, boolean changeFileSeq) 
throws IOException {
+    if (fileStatus.isDirectory()) {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (!fs.exists(finalSubPath)) {
+          fs.mkdirs(finalSubPath);
+        }
+        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
+        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
+          if (eachFile.getPath().getName().startsWith("_")) {
+            continue;
+          }
+          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputPath, nf, ++maxSeq, changeFileSeq);
+        }
+      } else {
+        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + 
fileStatus.getPath());
+      }
+    } else {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (changeFileSeq) {
+          finalSubPath = new Path(finalSubPath.getParent(), 
replaceFileNameSeq(finalSubPath, fileSeq, nf));
+        }
+        if (!fs.exists(finalSubPath.getParent())) {
+          fs.mkdirs(finalSubPath.getParent());
+        }
+        if (fs.exists(finalSubPath)) {
+          throw new IOException("Already exists data file:" + finalSubPath);
+        }
+        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
+        if (success) {
+          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
+              "to final output[" + finalSubPath + "]");
+        } else {
+          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " 
+
+              "to final output[" + finalSubPath + "]");
+        }
+      }
     }
+  }
 
-    return scanner;
+  /**
+   * Removes the path of the parent.
+   * @param parentPath
+   * @param childPath
+   * @return
+   */
+  private String extractSubPath(Path parentPath, Path childPath) {
+    String parentPathStr = parentPath.toUri().getPath();
+    String childPathStr = childPath.toUri().getPath();
+
+    if (parentPathStr.length() > childPathStr.length()) {
+      return null;
+    }
+
+    int index = childPathStr.indexOf(parentPathStr);
+    if (index != 0) {
+      return null;
+    }
+
+    return childPathStr.substring(parentPathStr.length() + 1);
   }
 
-  public static synchronized SeekableScanner getSeekableScanner(
-      TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, 
Schema target) throws IOException {
-    return (SeekableScanner)getStorageManager(conf, 
meta.getStoreType()).getScanner(meta, schema, fragment, target);
+  /**
+   * Attach the sequence number to a path.
+   *
+   * @param path Path
+   * @param seq sequence number
+   * @param nf Number format
+   * @return New path attached with sequence number
+   * @throws IOException
+   */
+  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) 
throws IOException {
+    String[] tokens = path.getName().split("-");
+    if (tokens.length != 4) {
+      throw new IOException("Wrong result file name:" + path);
+    }
+    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + 
nf.format(seq);
   }
 
-  public static synchronized SeekableScanner getSeekableScanner(
-      TajoConf conf, TableMeta meta, Schema schema, Path path) throws 
IOException {
+  /**
+   * Make sure all files are moved.
+   * @param fs FileSystem
+   * @param stagingPath The stagind directory
+   * @return
+   * @throws IOException
+   */
+  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws 
IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+    if (files != null && files.length != 0) {
+      for (FileStatus eachFile: files) {
+        if (eachFile.isFile()) {
+          LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
+          return false;
+        } else {
+          if (verifyAllFileMoved(fs, eachFile.getPath())) {
+            fs.delete(eachFile.getPath(), false);
+          } else {
+            return false;
+          }
+        }
+      }
+    }
 
-    FileSystem fs = path.getFileSystem(conf);
-    FileStatus status = fs.getFileStatus(path);
-    FileFragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
+    return true;
+  }
 
-    return getSeekableScanner(conf, meta, schema, fragment, schema);
+  /**
+   * This method sets a rename map which includes renamed staging directory to 
final output directory recursively.
+   * If there exists some data files, this delete it for duplicate data.
+   *
+   *
+   * @param fs
+   * @param stagingPath
+   * @param outputPath
+   * @param stagingParentPathString
+   * @throws IOException
+   */
+  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path 
outputPath,
+                                         String stagingParentPathString,
+                                         Map<Path, Path> renameDirs, Path 
oldTableDir) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+
+    for(FileStatus eachFile : files) {
+      if (eachFile.isDirectory()) {
+        Path oldPath = eachFile.getPath();
+
+        // Make recover directory.
+        String recoverPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
+            oldTableDir.toString());
+        Path recoveryPath = new Path(recoverPathString);
+        if (!fs.exists(recoveryPath)) {
+          fs.mkdirs(recoveryPath);
+        }
+
+        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, 
stagingParentPathString,
+            renameDirs, oldTableDir);
+        // Find last order partition for renaming
+        String newPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
+            outputPath.toString());
+        Path newPath = new Path(newPathString);
+        if (!isLeafDirectory(fs, eachFile.getPath())) {
+          renameDirs.put(eachFile.getPath(), newPath);
+        } else {
+          if (!fs.exists(newPath)) {
+            fs.mkdirs(newPath);
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException 
{
+    boolean retValue = false;
+
+    FileStatus[] files = fs.listStatus(path);
+    for (FileStatus file : files) {
+      if (fs.isDirectory(file.getPath())) {
+        retValue = true;
+        break;
+      }
+    }
+
+    return retValue;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java
new file mode 100644
index 0000000..6816d08
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class StorageProperty {
+  private boolean supportsInsertInto;
+  private boolean sortedInsert;
+
+  public boolean isSupportsInsertInto() {
+    return supportsInsertInto;
+  }
+
+  public void setSupportsInsertInto(boolean supportsInsertInto) {
+    this.supportsInsertInto = supportsInsertInto;
+  }
+
+  public boolean isSortedInsert() {
+    return sortedInsert;
+  }
+
+  public void setSortedInsert(boolean sortedInsert) {
+    this.sortedInsert = sortedInsert;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index 6af8da0..9e1e7ea 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.catalog.TableMeta;
@@ -58,12 +59,13 @@ public class AvroAppender extends FileAppender {
    * @param conf Configuration properties.
    * @param schema The table schema.
    * @param meta The table metadata.
-   * @param path The path of the Parquet file to write to.
+   * @param workDir The path of the Parquet file to write to.
    */
   public AvroAppender(Configuration conf,
+                      QueryUnitAttemptId taskAttemptId,
                       org.apache.tajo.catalog.Schema schema,
-                      TableMeta meta, Path path) throws IOException {
-    super(conf, schema, meta, path);
+                      TableMeta meta, Path workDir) throws IOException {
+    super(conf, taskAttemptId, schema, meta, workDir);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
index dcd9f0a..4a83dbf 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -24,6 +24,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
@@ -37,8 +38,8 @@ import static 
org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 public class FileFragment implements Fragment, Comparable<FileFragment>, 
Cloneable {
   @Expose private String tableName; // required
   @Expose private Path uri; // required
-  @Expose private Long startOffset; // required
-  @Expose private Long length; // required
+  @Expose public Long startOffset; // required
+  @Expose public Long length; // required
 
   private String[] hosts; // Datanode hostnames
   @Expose private int[] diskIds;
@@ -229,6 +230,7 @@ public class FileFragment implements Fragment, 
Comparable<FileFragment>, Cloneab
 
     FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
     fragmentBuilder.setId(this.tableName);
+    fragmentBuilder.setStoreType(StoreType.CSV.name());
     fragmentBuilder.setContents(builder.buildPartial().toByteString());
     return fragmentBuilder.build();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
index 0315a8d..07720c7 100644
--- 
a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 
 @ThreadSafe
 public class FragmentConvertor {
@@ -47,18 +46,17 @@ public class FragmentConvertor {
    */
   private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class 
};
 
-  public static Class<? extends Fragment> getFragmentClass(Configuration conf, 
StoreType storeType)
-      throws IOException {
-    String handlerName = storeType.name().toLowerCase();
-    Class<? extends Fragment> fragmentClass = 
CACHED_FRAGMENT_CLASSES.get(handlerName);
+  public static Class<? extends Fragment> getFragmentClass(Configuration conf, 
String storeType)
+  throws IOException {
+    Class<? extends Fragment> fragmentClass = 
CACHED_FRAGMENT_CLASSES.get(storeType.toLowerCase());
     if (fragmentClass == null) {
       fragmentClass = conf.getClass(
-          String.format("tajo.storage.fragment.%s.class", 
storeType.name().toLowerCase()), null, Fragment.class);
-      CACHED_FRAGMENT_CLASSES.put(handlerName, fragmentClass);
+          String.format("tajo.storage.fragment.%s.class", 
storeType.toLowerCase()), null, Fragment.class);
+      CACHED_FRAGMENT_CLASSES.put(storeType.toLowerCase(), fragmentClass);
     }
 
     if (fragmentClass == null) {
-      throw new IOException("No such a fragment for " + storeType.name());
+      throw new IOException("No such a fragment for " + 
storeType.toLowerCase());
     }
 
     return fragmentClass;
@@ -81,11 +79,11 @@ public class FragmentConvertor {
     return result;
   }
 
-  public static <T extends Fragment> T convert(Configuration conf, StoreType 
storeType, FragmentProto fragment)
+  public static <T extends Fragment> T convert(Configuration conf, 
FragmentProto fragment)
       throws IOException {
-    Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, storeType);
+    Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, 
fragment.getStoreType().toLowerCase());
     if (fragmentClass == null) {
-      throw new IOException("No such a fragment class for " + 
storeType.name());
+      throw new IOException("No such a fragment class for " + 
fragment.getStoreType());
     }
     return convert(fragmentClass, fragment);
   }
@@ -102,14 +100,13 @@ public class FragmentConvertor {
     return list;
   }
 
-  public static <T extends Fragment> List<T> convert(Configuration conf, 
StoreType storeType,
-                                                           
FragmentProto...fragments) throws IOException {
+  public static <T extends Fragment> List<T> convert(Configuration conf, 
FragmentProto...fragments) throws IOException {
     List<T> list = Lists.newArrayList();
     if (fragments == null) {
       return list;
     }
     for (FragmentProto proto : fragments) {
-      list.add((T) convert(conf, storeType, proto));
+      list.add((T) convert(conf, proto));
     }
     return list;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
new file mode 100644
index 0000000..8615235
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.TUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract class for HBase appender.
+ */
+public abstract class AbstractHBaseAppender implements Appender {
+  protected Configuration conf;
+  protected Schema schema;
+  protected TableMeta meta;
+  protected QueryUnitAttemptId taskAttemptId;
+  protected Path stagingDir;
+  protected boolean inited = false;
+
+  protected ColumnMapping columnMapping;
+  protected TableStatistics stats;
+  protected boolean enabledStats;
+
+  protected int columnNum;
+
+  protected byte[][][] mappingColumnFamilies;
+  protected boolean[] isBinaryColumns;
+  protected boolean[] isRowKeyMappings;
+  protected boolean[] isColumnKeys;
+  protected boolean[] isColumnValues;
+  protected int[] rowKeyFieldIndexes;
+  protected int[] rowkeyColumnIndexes;
+  protected char rowKeyDelimiter;
+
+  // the following four variables are used for '<cfname>:key:' or 
'<cfname>:value:' mapping
+  protected int[] columnKeyValueDataIndexes;
+  protected byte[][] columnKeyDatas;
+  protected byte[][] columnValueDatas;
+  protected byte[][] columnKeyCfNames;
+
+  protected KeyValue[] keyValues;
+
+  public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId 
taskAttemptId,
+                       Schema schema, TableMeta meta, Path stagingDir) {
+    this.conf = conf;
+    this.schema = schema;
+    this.meta = meta;
+    this.stagingDir = stagingDir;
+    this.taskAttemptId = taskAttemptId;
+  }
+
+  @Override
+  public void init() throws IOException {
+    if (inited) {
+      throw new IllegalStateException("FileAppender is already initialized.");
+    }
+    inited = true;
+    if (enabledStats) {
+      stats = new TableStatistics(this.schema);
+    }
+    columnMapping = new ColumnMapping(schema, meta);
+
+    mappingColumnFamilies = columnMapping.getMappingColumns();
+
+    isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+    List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>();
+    for (int i = 0; i < isRowKeyMappings.length; i++) {
+      if (isRowKeyMappings[i]) {
+        rowkeyColumnIndexList.add(i);
+      }
+    }
+    rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList);
+
+    isBinaryColumns = columnMapping.getIsBinaryColumns();
+    isColumnKeys = columnMapping.getIsColumnKeys();
+    isColumnValues = columnMapping.getIsColumnValues();
+    rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
+    rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
+
+    this.columnNum = schema.size();
+
+    // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object 
should be set with the qualifier and value
+    // which are mapped to the same column family.
+    columnKeyValueDataIndexes = new int[isColumnKeys.length];
+    int index = 0;
+    int numKeyValues = 0;
+    Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>();
+    for (int i = 0; i < isColumnKeys.length; i++) {
+      if (isRowKeyMappings[i]) {
+        continue;
+      }
+      if (isColumnKeys[i] || isColumnValues[i]) {
+        String cfName = new String(mappingColumnFamilies[i][0]);
+        if (!cfNameIndexMap.containsKey(cfName)) {
+          cfNameIndexMap.put(cfName, index);
+          columnKeyValueDataIndexes[i] = index;
+          index++;
+          numKeyValues++;
+        } else {
+          columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName);
+        }
+      } else {
+        numKeyValues++;
+      }
+    }
+    columnKeyCfNames = new byte[cfNameIndexMap.size()][];
+    for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) {
+      columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes();
+    }
+    columnKeyDatas = new byte[cfNameIndexMap.size()][];
+    columnValueDatas = new byte[cfNameIndexMap.size()][];
+
+    keyValues = new KeyValue[numKeyValues];
+  }
+
+  private ByteArrayOutputStream bout = new ByteArrayOutputStream();
+
+  protected byte[] getRowKeyBytes(Tuple tuple) throws IOException {
+    Datum datum;
+    byte[] rowkey;
+    if (rowkeyColumnIndexes.length > 1) {
+      bout.reset();
+      for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
+        datum = tuple.get(rowkeyColumnIndexes[i]);
+        if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
+          rowkey = 
HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]),
 datum);
+        } else {
+          rowkey = 
HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]),
 datum);
+        }
+        bout.write(rowkey);
+        if (i < rowkeyColumnIndexes.length - 1) {
+          bout.write(rowKeyDelimiter);
+        }
+      }
+      rowkey = bout.toByteArray();
+    } else {
+      int index = rowkeyColumnIndexes[0];
+      datum = tuple.get(index);
+      if (isBinaryColumns[index]) {
+        rowkey = 
HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
+      } else {
+        rowkey = 
HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
+      }
+    }
+
+    return rowkey;
+  }
+
+  protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException {
+    int keyValIndex = 0;
+    for (int i = 0; i < columnNum; i++) {
+      if (isRowKeyMappings[i]) {
+        continue;
+      }
+      Datum datum = tuple.get(i);
+      byte[] value;
+      if (isBinaryColumns[i]) {
+        value = 
HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+      } else {
+        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), 
datum);
+      }
+
+      if (isColumnKeys[i]) {
+        columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
+      } else if (isColumnValues[i]) {
+        columnValueDatas[columnKeyValueDataIndexes[i]] = value;
+      } else {
+        keyValues[keyValIndex] = new KeyValue(rowkey, 
mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
+        keyValIndex++;
+      }
+    }
+
+    for (int i = 0; i < columnKeyDatas.length; i++) {
+      keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], 
columnKeyDatas[i], columnValueDatas[i]);
+    }
+  }
+
+  @Override
+  public void enableStats() {
+    enabledStats = true;
+  }
+
+  @Override
+  public TableStats getStats() {
+    if (enabledStats) {
+      return stats.getTableStat();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
new file mode 100644
index 0000000..8044494
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+
+public class AddSortForInsertRewriter implements RewriteRule {
+  private int[] sortColumnIndexes;
+  private Column[] sortColumns;
+  public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
+    this.sortColumns = sortColumns;
+    this.sortColumnIndexes = new int[sortColumns.length];
+
+    Schema tableSchema = tableDesc.getSchema();
+    for (int i = 0; i < sortColumns.length; i++) {
+      sortColumnIndexes[i] = 
tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "AddSortForInsertRewriter";
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    StoreType storeType = PlannerUtil.getStoreType(plan);
+    return storeType != null;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    UnaryNode insertNode = rootNode.getChild();
+    LogicalNode childNode = insertNode.getChild();
+
+    Schema sortSchema = childNode.getOutSchema();
+    SortNode sortNode = plan.createNode(SortNode.class);
+    sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+    sortNode.setInSchema(sortSchema);
+    sortNode.setOutSchema(sortSchema);
+
+    SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+    int index = 0;
+
+    for (int i = 0; i < sortColumnIndexes.length; i++) {
+      Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+      if (sortColumn == null) {
+        throw new PlanningException("Can't fine proper sort column:" + 
sortColumns[i]);
+      }
+      sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+    }
+    sortNode.setSortSpecs(sortSpecs);
+
+    sortNode.setChild(insertNode.getChild());
+    insertNode.setChild(sortNode);
+    plan.getRootBlock().registerNode(sortNode);
+
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
new file mode 100644
index 0000000..f80bd5e
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.util.BytesUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ColumnMapping {
+  private TableMeta tableMeta;
+  private Schema schema;
+  private char rowKeyDelimiter;
+
+  private String hbaseTableName;
+
+  private int[] rowKeyFieldIndexes;
+  private boolean[] isRowKeyMappings;
+  private boolean[] isBinaryColumns;
+  private boolean[] isColumnKeys;
+  private boolean[] isColumnValues;
+
+  // schema order -> 0: cf name, 1: column name -> name bytes
+  private byte[][][] mappingColumns;
+
+  private int numRowKeys;
+
+  public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException {
+    this.schema = schema;
+    this.tableMeta = tableMeta;
+
+    init();
+  }
+
+  private void init() throws IOException {
+    hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY);
+    String delim = 
tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
+    if (delim.length() > 0) {
+      rowKeyDelimiter = delim.charAt(0);
+    }
+    isRowKeyMappings = new boolean[schema.size()];
+    rowKeyFieldIndexes = new int[schema.size()];
+    isBinaryColumns = new boolean[schema.size()];
+    isColumnKeys = new boolean[schema.size()];
+    isColumnValues = new boolean[schema.size()];
+
+    mappingColumns = new byte[schema.size()][][];
+
+    for (int i = 0; i < schema.size(); i++) {
+      rowKeyFieldIndexes[i] = -1;
+    }
+
+    String columnMapping = 
tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
+    if (columnMapping == null || columnMapping.isEmpty()) {
+      throw new IOException("'columns' property is required.");
+    }
+
+    String[] columnMappingTokens = columnMapping.split(",");
+
+    if (columnMappingTokens.length != schema.getColumns().size()) {
+      throw new IOException("The number of mapped HBase columns is great than 
the number of Tajo table columns");
+    }
+
+    int index = 0;
+    for (String eachToken: columnMappingTokens) {
+      mappingColumns[index] = new byte[2][];
+
+      byte[][] mappingTokens = 
BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':');
+
+      if (mappingTokens.length == 3) {
+        if (mappingTokens[0].length == 0) {
+          // cfname
+          throw new IOException(eachToken + " 'column' attribute should be 
'<cfname>:key:' or '<cfname>:key:#b' " +
+              "or '<cfname>:value:' or '<cfname>:value:#b'");
+        }
+        //<cfname>:key: or <cfname>:value:
+        if (mappingTokens[2].length != 0) {
+          String binaryOption = new String(mappingTokens[2]);
+          if ("#b".equals(binaryOption)) {
+            isBinaryColumns[index] = true;
+          } else {
+            throw new IOException(eachToken + " 'column' attribute should be 
'<cfname>:key:' or '<cfname>:key:#b' " +
+                "or '<cfname>:value:' or '<cfname>:value:#b'");
+          }
+        }
+        mappingColumns[index][0] = mappingTokens[0];
+        String keyOrValue = new String(mappingTokens[1]);
+        if 
(HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
+          isColumnKeys[index] = true;
+        } else if 
(HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
+          isColumnValues[index] = true;
+        } else {
+          throw new IOException(eachToken + " 'column' attribute should be 
'<cfname>:key:' or '<cfname>:value:'");
+        }
+      } else if (mappingTokens.length == 2) {
+        //<cfname>: or <cfname>:<qualifier> or :key
+        String cfName = new String(mappingTokens[0]);
+        String columnName = new String(mappingTokens[1]);
+        RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName);
+        if (rowKeyMapping != null) {
+          isRowKeyMappings[index] = true;
+          numRowKeys++;
+          isBinaryColumns[index] = rowKeyMapping.isBinary();
+          if (!cfName.isEmpty()) {
+            if (rowKeyDelimiter == 0) {
+              throw new IOException("hbase.rowkey.delimiter is required.");
+            }
+            rowKeyFieldIndexes[index] = Integer.parseInt(cfName);
+          } else {
+            rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column.
+          }
+        } else {
+          if (cfName.isEmpty()) {
+            throw new IOException(eachToken + " 'column' attribute should be 
'<cfname>:key:' or '<cfname>:value:'");
+          }
+          if (cfName != null) {
+            mappingColumns[index][0] = Bytes.toBytes(cfName);
+          }
+
+          if (columnName != null && !columnName.isEmpty()) {
+            String[] columnNameTokens = columnName.split("#");
+            if (columnNameTokens[0].isEmpty()) {
+              mappingColumns[index][1] = null;
+            } else {
+              mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]);
+            }
+            if (columnNameTokens.length == 2 && 
"b".equals(columnNameTokens[1])) {
+              isBinaryColumns[index] = true;
+            }
+          }
+        }
+      } else {
+        throw new IOException(eachToken + " 'column' attribute 
'[cfname]:[qualfier]:'");
+      }
+
+      index++;
+    } // for loop
+  }
+
+  public List<String> getColumnFamilyNames() {
+    List<String> cfNames = new ArrayList<String>();
+
+    for (byte[][] eachCfName: mappingColumns) {
+      if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != 
null) {
+        String cfName = new String(eachCfName[0]);
+        if (!cfNames.contains(cfName)) {
+          cfNames.add(cfName);
+        }
+      }
+    }
+
+    return cfNames;
+  }
+
+  private RowKeyMapping getRowKeyMapping(String cfName, String columnName) {
+    if (columnName == null || columnName.isEmpty()) {
+      return null;
+    }
+
+    String[] tokens = columnName.split("#");
+    if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) 
{
+      return null;
+    }
+
+    RowKeyMapping rowKeyMapping = new RowKeyMapping();
+
+    if (tokens.length == 2 && "b".equals(tokens[1])) {
+      rowKeyMapping.setBinary(true);
+    }
+
+    if (cfName != null && !cfName.isEmpty()) {
+      rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName));
+    }
+    return rowKeyMapping;
+  }
+
+  public char getRowKeyDelimiter() {
+    return rowKeyDelimiter;
+  }
+
+  public int[] getRowKeyFieldIndexes() {
+    return rowKeyFieldIndexes;
+  }
+
+  public boolean[] getIsRowKeyMappings() {
+    return isRowKeyMappings;
+  }
+
+  public byte[][][] getMappingColumns() {
+    return mappingColumns;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public boolean[] getIsBinaryColumns() {
+    return isBinaryColumns;
+  }
+
+  public String getHbaseTableName() {
+    return hbaseTableName;
+  }
+
+  public boolean[] getIsColumnKeys() {
+    return isColumnKeys;
+  }
+
+  public int getNumRowKeys() {
+    return numRowKeys;
+  }
+
+  public boolean[] getIsColumnValues() {
+    return isColumnValues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
new file mode 100644
index 0000000..c05c5bb
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+
+public class HBaseBinarySerializerDeserializer {
+
+  public static Datum deserialize(Column col, byte[] bytes) throws IOException 
{
+    Datum datum;
+    switch (col.getDataType().getType()) {
+      case INT1:
+      case INT2:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : 
DatumFactory.createInt2(Bytes.toShort(bytes));
+        break;
+      case INT4:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : 
DatumFactory.createInt4(Bytes.toInt(bytes));
+        break;
+      case INT8:
+        if (bytes.length == 4) {
+          datum = bytes == null || bytes.length == 0 ? NullDatum.get() : 
DatumFactory.createInt8(Bytes.toInt(bytes));
+        } else {
+          datum = bytes == null || bytes.length == 0 ? NullDatum.get() : 
DatumFactory.createInt8(Bytes.toLong(bytes));
+        }
+        break;
+      case FLOAT4:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : 
DatumFactory.createFloat4(Bytes.toFloat(bytes));
+        break;
+      case FLOAT8:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : 
DatumFactory.createFloat8(Bytes.toDouble(bytes));
+        break;
+      case TEXT:
+        datum = bytes == null ? NullDatum.get() : 
DatumFactory.createText(bytes);
+        break;
+      default:
+        datum = NullDatum.get();
+        break;
+    }
+    return datum;
+  }
+
+  public static byte[] serialize(Column col, Datum datum) throws IOException {
+    if (datum == null || datum instanceof NullDatum) {
+      return null;
+    }
+
+    byte[] bytes;
+    switch (col.getDataType().getType()) {
+      case INT1:
+      case INT2:
+        bytes = Bytes.toBytes(datum.asInt2());
+        break;
+      case INT4:
+        bytes = Bytes.toBytes(datum.asInt4());
+        break;
+      case INT8:
+        bytes = Bytes.toBytes(datum.asInt8());
+        break;
+      case FLOAT4:
+        bytes = Bytes.toBytes(datum.asFloat4());
+        break;
+      case FLOAT8:
+        bytes = Bytes.toBytes(datum.asFloat8());
+        break;
+      case TEXT:
+        bytes = Bytes.toBytes(datum.asChars());
+        break;
+      default:
+        bytes = null;
+        break;
+    }
+
+    return bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
new file mode 100644
index 0000000..43ad7f3
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.storage.fragment.Fragment;
+import 
org.apache.tajo.storage.fragment.StorageFragmentProtos.HBaseFragmentProto;
+
+public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, 
Cloneable {
+  @Expose
+  private String tableName;
+  @Expose
+  private String hbaseTableName;
+  @Expose
+  private byte[] startRow;
+  @Expose
+  private byte[] stopRow;
+  @Expose
+  private String regionLocation;
+  @Expose
+  private boolean last;
+  @Expose
+  private long length;
+
+  public HBaseFragment(String tableName, String hbaseTableName, byte[] 
startRow, byte[] stopRow, String regionLocation) {
+    this.tableName = tableName;
+    this.hbaseTableName = hbaseTableName;
+    this.startRow = startRow;
+    this.stopRow = stopRow;
+    this.regionLocation = regionLocation;
+    this.last = false;
+  }
+
+  public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException {
+    HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
+    builder.mergeFrom(raw);
+    builder.build();
+    init(builder.build());
+  }
+
+  private void init(HBaseFragmentProto proto) {
+    this.tableName = proto.getTableName();
+    this.hbaseTableName = proto.getHbaseTableName();
+    this.startRow = proto.getStartRow().toByteArray();
+    this.stopRow = proto.getStopRow().toByteArray();
+    this.regionLocation = proto.getRegionLocation();
+    this.length = proto.getLength();
+    this.last = proto.getLast();
+  }
+
+  @Override
+  public int compareTo(HBaseFragment t) {
+    return Bytes.compareTo(startRow, t.startRow);
+  }
+
+  @Override
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public String getKey() {
+    return new String(startRow);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return startRow == null || stopRow == null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
+  }
+
+  public void setLength(long length) {
+    this.length = length;
+  }
+
+  @Override
+  public String[] getHosts() {
+    return new String[] {regionLocation};
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    HBaseFragment frag = (HBaseFragment) super.clone();
+    frag.tableName = tableName;
+    frag.hbaseTableName = hbaseTableName;
+    frag.startRow = startRow;
+    frag.stopRow = stopRow;
+    frag.regionLocation = regionLocation;
+    frag.last = last;
+    frag.length = length;
+    return frag;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof HBaseFragment) {
+      HBaseFragment t = (HBaseFragment) o;
+      if (tableName.equals(t.tableName)
+          && Bytes.equals(startRow, t.startRow)
+          && Bytes.equals(stopRow, t.stopRow)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow);
+  }
+
+  @Override
+  public String toString() {
+    return "\"fragment\": {\"tableName\": \""+ tableName + "\", 
hbaseTableName\": \"" + hbaseTableName + "\"" +
+        ", \"startRow\": \"" + new String(startRow) + "\"" +
+        ", \"stopRow\": \"" + new String(stopRow) + "\"" +
+        ", \"length\": \"" + length + "\"}" ;
+  }
+
+  @Override
+  public FragmentProto getProto() {
+    HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
+    builder.setTableName(tableName)
+        .setHbaseTableName(hbaseTableName)
+        .setStartRow(ByteString.copyFrom(startRow))
+        .setStopRow(ByteString.copyFrom(stopRow))
+        .setLast(last)
+        .setLength(length)
+        .setRegionLocation(regionLocation);
+
+    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+    fragmentBuilder.setId(this.tableName);
+    fragmentBuilder.setContents(builder.buildPartial().toByteString());
+    fragmentBuilder.setStoreType(StoreType.HBASE.name());
+    return fragmentBuilder.build();
+  }
+
+  public byte[] getStartRow() {
+    return startRow;
+  }
+
+  public byte[] getStopRow() {
+    return stopRow;
+  }
+
+  public String getRegionLocation() {
+    return regionLocation;
+  }
+
+  public boolean isLast() {
+    return last;
+  }
+
+  public void setLast(boolean last) {
+    this.last = last;
+  }
+
+  public String getHbaseTableName() {
+    return hbaseTableName;
+  }
+
+  public void setHbaseTableName(String hbaseTableName) {
+    this.hbaseTableName = hbaseTableName;
+  }
+
+  public void setStartRow(byte[] startRow) {
+    this.startRow = startRow;
+  }
+
+  public void setStopRow(byte[] stopRow) {
+    this.stopRow = stopRow;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
new file mode 100644
index 0000000..50f61a8
--- /dev/null
+++ 
b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class HBasePutAppender extends AbstractHBaseAppender {
+  private HTableInterface htable;
+  private long totalNumBytes;
+
+  public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+                          Schema schema, TableMeta meta, Path stagingDir) {
+    super(conf, taskAttemptId, schema, meta, stagingDir);
+  }
+
+  @Override
+  public void init() throws IOException {
+    super.init();
+
+    Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, 
meta);
+    HConnection hconn = ((HBaseStorageManager) 
StorageManager.getStorageManager((TajoConf)conf, StoreType.HBASE))
+        .getConnection(hbaseConf);
+    htable = hconn.getTable(columnMapping.getHbaseTableName());
+    htable.setAutoFlushTo(false);
+    htable.setWriteBufferSize(5 * 1024 * 1024);
+  }
+
+  @Override
+  public void addTuple(Tuple tuple) throws IOException {
+    byte[] rowkey = getRowKeyBytes(tuple);
+    totalNumBytes += rowkey.length;
+    Put put = new Put(rowkey);
+    readKeyValues(tuple, rowkey);
+
+    for (int i = 0; i < columnNum; i++) {
+      if (isRowKeyMappings[i]) {
+        continue;
+      }
+      Datum datum = tuple.get(i);
+      byte[] value;
+      if (isBinaryColumns[i]) {
+        value = 
HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+      } else {
+        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), 
datum);
+      }
+
+      if (isColumnKeys[i]) {
+        columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
+      } else if (isColumnValues[i]) {
+        columnValueDatas[columnKeyValueDataIndexes[i]] = value;
+      } else {
+        put.add(mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], 
value);
+        totalNumBytes += value.length;
+      }
+    }
+
+    for (int i = 0; i < columnKeyDatas.length; i++) {
+     put.add(columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
+      totalNumBytes += columnKeyDatas[i].length + columnValueDatas[i].length;
+    }
+
+    htable.put(put);
+
+    if (enabledStats) {
+      stats.incrementRow();
+      stats.setNumBytes(totalNumBytes);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    htable.flushCommits();
+  }
+
+  @Override
+  public long getEstimatedOutputSize() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (htable != null) {
+      htable.flushCommits();
+      htable.close();
+    }
+    if (enabledStats) {
+      stats.setNumBytes(totalNumBytes);
+    }
+  }
+}

Reply via email to