http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 00edc79..7ac4cc5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.Schema;
@@ -54,6 +53,7 @@ import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -107,39 +107,6 @@ public class Task {
   private Schema finalSchema = null;
   private TupleComparator sortComp = null;
 
-  static final String OUTPUT_FILE_PREFIX="part-";
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(2);
-          return fmt;
-        }
-      };
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(6);
-          return fmt;
-        }
-      };
-
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(3);
-          return fmt;
-        }
-      };
-
   public Task(String taskRunnerId,
               Path baseDir,
               QueryUnitAttemptId taskId,
@@ -189,13 +156,8 @@ public class Task {
         this.sortComp = new BaseTupleComparator(finalSchema, 
sortNode.getSortKeys());
       }
     } else {
-      // The final result of a task will be written in a file named 
part-ss-nnnnnnn,
-      // where ss is the subquery id associated with this task, and nnnnnn is 
the task id.
-      Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), 
TajoConstants.RESULT_DIR_NAME,
-          OUTPUT_FILE_PREFIX +
-          
OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId())
 + "-" +
-          
OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()) + "-" +
-          OUTPUT_FILE_FORMAT_SEQ.get().format(0));
+      Path outFilePath = 
StorageManager.getFileStorageManager(systemConf).getAppenderFilePath(
+          taskId, queryContext.getStagingDir());
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp 
b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
index 18a67d8..49635d1 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
@@ -41,6 +41,7 @@
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.Map" %>
 <%@ page import="java.util.Set" %>
+<%@ page import="org.apache.tajo.storage.fragment.Fragment" %>
 
 <%
     String paramQueryId = request.getParameter("queryId");
@@ -102,8 +103,8 @@
     String fragmentInfo = "";
     String delim = "";
     for (CatalogProtos.FragmentProto eachFragment : 
queryUnit.getAllFragments()) {
-        FileFragment fileFragment = 
FragmentConvertor.convert(FileFragment.class, eachFragment);
-        fragmentInfo += delim + fileFragment.toString();
+        Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), 
eachFragment);
+        fragmentInfo += delim + fragment.toString();
         delim = "<br/>";
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java 
b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
new file mode 100644
index 0000000..a8e4a5c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.tajo.util.Bytes;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
+
+public class HBaseTestClusterUtil {
+  private static final Log LOG = LogFactory.getLog(HBaseTestClusterUtil.class);
+  private Configuration conf;
+  private MiniHBaseCluster hbaseCluster;
+  private MiniZooKeeperCluster zkCluster;
+  private File testBaseDir;
+  public HBaseTestClusterUtil(Configuration conf, File testBaseDir) {
+    this.conf = conf;
+    this.testBaseDir = testBaseDir;
+  }
+  /**
+   * Returns the path to the default root dir the minicluster uses.
+   * Note: this does not cause the root dir to be created.
+   * @return Fully qualified path for the default hbase root dir
+   * @throws java.io.IOException
+   */
+  public Path getDefaultRootDirPath() throws IOException {
+    FileSystem fs = FileSystem.get(this.conf);
+    return new Path(fs.makeQualified(fs.getHomeDirectory()),"hbase");
+  }
+
+  /**
+   * Creates an hbase rootdir in user home directory.  Also creates hbase
+   * version file.  Normally you won't make use of this method.  Root hbasedir
+   * is created for you as part of mini cluster startup.  You'd only use this
+   * method if you were doing manual operation.
+   * @return Fully qualified path to hbase root dir
+   * @throws java.io.IOException
+   */
+  public Path createRootDir() throws IOException {
+    FileSystem fs = FileSystem.get(this.conf);
+    Path hbaseRootdir = getDefaultRootDirPath();
+    FSUtils.setRootDir(this.conf, hbaseRootdir);
+    fs.mkdirs(hbaseRootdir);
+    FSUtils.setVersion(fs, hbaseRootdir);
+    return hbaseRootdir;
+  }
+
+  public void stopHBaseCluster() throws IOException {
+    if (hbaseCluster != null) {
+      LOG.info("MiniHBaseCluster stopped");
+      hbaseCluster.shutdown();
+      hbaseCluster.waitUntilShutDown();
+      hbaseCluster = null;
+    }
+  }
+
+  public void startHBaseCluster() throws Exception {
+    if (zkCluster == null) {
+      startMiniZKCluster();
+    }
+    if (hbaseCluster != null) {
+      return;
+    }
+
+    System.setProperty("HBASE_ZNODE_FILE", testBaseDir + "/hbase_znode_file");
+    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) 
{
+      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+    }
+    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) 
{
+      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
+    }
+    conf.setBoolean(REPLICATION_ENABLE_KEY, false);
+    createRootDir();
+
+    Configuration c = HBaseConfiguration.create(this.conf);
+
+    hbaseCluster = new MiniHBaseCluster(c, 1);
+
+    // Don't leave here till we've done a successful scan of the hbase:meta
+    HTable t = new HTable(c, TableName.META_TABLE_NAME);
+    ResultScanner s = t.getScanner(new Scan());
+    while (s.next() != null) {
+      continue;
+    }
+    s.close();
+    t.close();
+    LOG.info("MiniHBaseCluster started");
+
+  }
+
+  /**
+   * Start a mini ZK cluster. If the property 
"test.hbase.zookeeper.property.clientPort" is set
+   *  the port mentionned is used as the default port for ZooKeeper.
+   */
+  public MiniZooKeeperCluster startMiniZKCluster()
+      throws Exception {
+    File zkDataPath = new File(testBaseDir, "zk");
+    if (this.zkCluster != null) {
+      throw new IOException("Cluster already running at " + zkDataPath);
+    }
+    this.zkCluster = new MiniZooKeeperCluster(conf);
+    final int defPort = 
this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0);
+    if (defPort > 0){
+      // If there is a port in the config file, we use it.
+      this.zkCluster.setDefaultClientPort(defPort);
+    }
+    int clientPort =  this.zkCluster.startup(zkDataPath, 1);
+    this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, 
Integer.toString(clientPort));
+    LOG.info("MiniZooKeeperCluster started");
+
+    return this.zkCluster;
+  }
+
+  public void stopZooKeeperCluster() throws IOException {
+    if (zkCluster != null) {
+      LOG.info("MiniZooKeeperCluster stopped");
+      zkCluster.shutdown();
+      zkCluster = null;
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public MiniZooKeeperCluster getMiniZooKeeperCluster() {
+    return zkCluster;
+  }
+
+  public MiniHBaseCluster getMiniHBaseCluster() {
+    return hbaseCluster;
+  }
+
+  public HTableDescriptor getTableDescriptor(String tableName) throws 
IOException {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    try {
+      return admin.getTableDescriptor(Bytes.toBytes(tableName));
+    } finally {
+      admin.close();
+    }
+  }
+
+  public void createTable(HTableDescriptor hTableDesc) throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    try {
+      admin.createTable(hTableDesc);
+    } finally {
+      admin.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java 
b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 604a56b..373774e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -169,9 +169,9 @@ public class QueryTestCaseBase {
   private static String currentDatabase;
   private static Set<String> createdTableGlobalSet = new HashSet<String>();
   // queries and results directory corresponding to subclass class.
-  private Path currentQueryPath;
-  private Path currentResultPath;
-  private Path currentDatasetPath;
+  protected Path currentQueryPath;
+  protected Path currentResultPath;
+  protected Path currentDatasetPath;
 
   // for getting a method name
   @Rule public TestName name = new TestName();
@@ -303,7 +303,7 @@ public class QueryTestCaseBase {
     return executeFile(getMethodName() + ".sql");
   }
 
-  private String getMethodName() {
+  protected String getMethodName() {
     String methodName = name.getMethodName();
     // In the case of parameter execution name's pattern is methodName[0]
     if (methodName.endsWith("]")) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 8ddc264..d5f1a67 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -69,6 +69,7 @@ public class TajoTestingCluster {
   private FileSystem defaultFS;
   private MiniDFSCluster dfsCluster;
        private MiniCatalogServer catalogServer;
+  private HBaseTestClusterUtil hbaseUtil;
 
   private TajoMaster tajoMaster;
   private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
@@ -259,6 +260,10 @@ public class TajoTestingCluster {
     return this.defaultFS;
   }
 
+  public HBaseTestClusterUtil getHBaseUtil() {
+    return hbaseUtil;
+  }
+
   ////////////////////////////////////////////////////////
   // Catalog Section
   ////////////////////////////////////////////////////////
@@ -482,6 +487,8 @@ public class TajoTestingCluster {
     startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
     this.dfsCluster.waitClusterUp();
 
+    hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
+
     if(!standbyWorkerMode) {
       startMiniYarnCluster();
     }
@@ -569,7 +576,6 @@ public class TajoTestingCluster {
     }
 
     if(this.dfsCluster != null) {
-
       try {
         FileSystem fs = this.dfsCluster.getFileSystem();
         if (fs != null) fs.close();
@@ -588,6 +594,10 @@ public class TajoTestingCluster {
       }
       this.clusterTestBuildDir = null;
     }
+
+    hbaseUtil.stopZooKeeperCluster();
+    hbaseUtil.stopHBaseCluster();
+
     LOG.info("Minicluster is down");
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
index 7f402a1..e331599 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
@@ -25,6 +25,7 @@ import org.apache.tajo.engine.eval.ExprTestBase;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.text.DecimalFormat;
 
 import static org.apache.tajo.common.TajoDataTypes.Type.*;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 5cc89b8..fee6f4c 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -380,7 +380,7 @@ public class TestPlannerUtil {
     int index = 0;
 
     for (int i = startIndex; i < startIndex + expectedSize; i++, index++) {
-      FileFragment fragment = 
FragmentConvertor.convert(util.getConfiguration(), StoreType.CSV, 
fragments[index]);
+      FileFragment fragment = 
FragmentConvertor.convert(util.getConfiguration(), fragments[index]);
       assertEquals(expectedFiles.get(i), fragment.getPath());
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 2c02405..aa36ea7 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -196,8 +196,7 @@ public class TestBSTIndexExec {
       Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
           "Error: There is no table matched to %s", scanNode.getTableName());
 
-      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), 
meta.getStoreType(),
-          ctx.getTables(scanNode.getTableName()));
+      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), 
ctx.getTables(scanNode.getTableName()));
       
       Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index c86c4c7..06f4001 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -48,6 +48,7 @@ import org.apache.tajo.master.session.Session;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
@@ -76,7 +77,6 @@ import static 
org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
 import static 
org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import static org.junit.Assert.*;
 
 public class TestPhysicalPlanner {
@@ -441,7 +441,6 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(defaultContext, context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
new file mode 100644
index 0000000..cb9aa74
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -0,0 +1,1474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.hbase.*;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.InetAddress;
+import java.sql.ResultSet;
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+@Category(IntegrationTest.class)
+public class TestHBaseTable extends QueryTestCaseBase {
+  private static final Log LOG = LogFactory.getLog(TestHBaseTable.class);
+
+  @BeforeClass
+  public static void beforeClass() {
+    try {
+      testingCluster.getHBaseUtil().startHBaseCluster();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    try {
+      testingCluster.getHBaseUtil().stopHBaseCluster();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testVerifyCreateHBaseTableRequiredMeta() throws Exception {
+    try {
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " 
+
+          "USING hbase").close();
+
+      fail("hbase table must have 'table' meta");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0);
+    }
+
+    try {
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " 
+
+          "USING hbase " +
+          "WITH ('table'='hbase_table')").close();
+
+      fail("hbase table must have 'columns' meta");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("'columns' property is required") >= 
0);
+    }
+
+    try {
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " 
+
+          "USING hbase " +
+          "WITH ('table'='hbase_table', 'columns'='col1:,col2:')").close();
+
+      fail("hbase table must have 'hbase.zookeeper.quorum' meta");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0);
+    }
+  }
+
+  @Test
+  public void testCreateHBaseTable() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, 
col3 text, col4 text) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col2:a,col3:,col2:b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table1");
+
+    HTableDescriptor hTableDesc = 
testingCluster.getHBaseUtil().getTableDescriptor("hbase_table");
+    assertNotNull(hTableDesc);
+    assertEquals("hbase_table", hTableDesc.getNameAsString());
+
+    HColumnDescriptor[] hColumns = hTableDesc.getColumnFamilies();
+    // col1 is mapped to rowkey
+    assertEquals(2, hColumns.length);
+    assertEquals("col2", hColumns[0].getNameAsString());
+    assertEquals("col3", hColumns[1].getNameAsString());
+
+    executeString("DROP TABLE hbase_mapped_table1 PURGE").close();
+
+    HBaseAdmin hAdmin =  new 
HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    try {
+      assertFalse(hAdmin.tableExists("hbase_table"));
+    } finally {
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testCreateNotExistsExternalHBaseTable() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    try {
+      executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 
text, col2 text, col3 text, col4 text) " +
+          "USING hbase WITH ('table'='external_hbase_table', 
'columns'=':key,col2:a,col3:,col2:b', " +
+          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+      fail("External table should be a existed table.");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("External table should be a existed 
table.") >= 0);
+    }
+  }
+
+  @Test
+  public void testCreateRowFieldWithNonText() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    try {
+      executeString("CREATE TABLE hbase_mapped_table2 (rk1 int4, rk2 text, 
col3 text, col4 text) " +
+          "USING hbase WITH ('table'='hbase_table', 
'columns'='0:key#b,1:key,col3:,col2:b', " +
+          "'hbase.rowkey.delimiter'='_', " +
+          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+      fail("Key field type should be TEXT type");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("Key field type should be TEXT type") 
>= 0);
+    }
+  }
+
+  @Test
+  public void testCreateExternalHBaseTable() throws Exception {
+    HTableDescriptor hTableDesc = new 
HTableDescriptor(TableName.valueOf("external_hbase_table_not_purge"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, 
col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table_not_purge', 
'columns'=':key,col1:a,col2:,col3:b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    executeString("DROP TABLE external_hbase_mapped_table").close();
+
+    HBaseAdmin hAdmin =  new 
HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    try {
+      assertTrue(hAdmin.tableExists("external_hbase_table_not_purge"));
+      hAdmin.disableTable("external_hbase_table_not_purge");
+      hAdmin.deleteTable("external_hbase_table_not_purge");
+    } finally {
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testSimpleSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new 
HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, 
col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 
'columns'=':key,col1:a,col2:,col3:b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HConnection hconn = 
((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(i).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table 
where rk > '20'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testBinaryMappedQuery() throws Exception {
+    HTableDescriptor hTableDesc = new 
HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, 
col1 text, col2 text, col3 int4)\n " +
+        "USING hbase WITH ('table'='external_hbase_table', 
'columns'=':key#b,col1:a,col2:,col3:b#b', \n" +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "', \n" +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HConnection hconn = 
((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(Bytes.toBytes((long) i));
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes(i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table 
where rk > 20");
+      assertResultSet(res);
+      res.close();
+
+      //Projection
+      res = executeString("select col3, col2, rk from 
external_hbase_mapped_table where rk > 95");
+
+      String expected = "col3,col2,rk\n" +
+          "-------------------------------\n" +
+          "96,{\"k1\":\"k1-96\", \"k2\":\"k2-96\"},96\n" +
+          "97,{\"k1\":\"k1-97\", \"k2\":\"k2-97\"},97\n" +
+          "98,{\"k1\":\"k1-98\", \"k2\":\"k2-98\"},98\n" +
+          "99,{\"k1\":\"k1-99\", \"k2\":\"k2-99\"},99\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testColumnKeyValueSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new 
HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 
text, col2_key text, col2_value text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 
'columns'=':key,col2:key:,col2:value:,col3:', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HConnection hconn = 
((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 10; i++) {
+        Put put = new Put(Bytes.toBytes("rk-" + i));
+        for (int j = 0; j < 5; j++) {
+          put.add("col2".getBytes(), ("key-" + j).getBytes(), 
Bytes.toBytes("value-" + j));
+        }
+        put.add("col3".getBytes(), "".getBytes(), ("col3-value-" + 
i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table 
where rk1 >= 'rk-0'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testRowFieldSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new 
HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 
text, rk2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 
'columns'='0:key,1:key,col3:a', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HConnection hconn = 
((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(("field1-" + i + "_field2-" + i).getBytes());
+        put.add("col3".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table 
where rk1 > 'field1-20'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testIndexPredication() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 
text, col3 text) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col1:a,col2:,col3:b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin = new 
HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    hAdmin.tableExists("hbase_table");
+
+    HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+    try {
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = 
htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+      assertIndexPredication(false);
+
+      ResultSet res = executeString("select * from hbase_mapped_table where rk 
>= '020' and rk <= '055'");
+      assertResultSet(res);
+      res.close();
+
+      res = executeString("select * from hbase_mapped_table where rk = '021'");
+      String expected = "rk,col1,col2,col3\n" +
+          "-------------------------------\n" +
+          "021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      htable.close();
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testCompositeRowIndexPredication() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 
text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin = new 
HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    hAdmin.tableExists("hbase_table");
+
+    HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+    try {
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = 
htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+
+      Scan scan = new Scan();
+      scan.setStartRow("021".getBytes());
+      scan.setStopRow(("021_" + new String(new 
char[]{Character.MAX_VALUE})).getBytes());
+      Filter filter = new InclusiveStopFilter(scan.getStopRow());
+      scan.setFilter(filter);
+
+      ResultScanner scanner = htable.getScanner(scan);
+      Result result = scanner.next();
+      assertNotNull(result);
+      assertEquals("021_021", new String(result.getRow()));
+      scanner.close();
+
+      assertIndexPredication(true);
+
+      ResultSet res = executeString("select * from hbase_mapped_table where rk 
= '021'");
+      String expected = "rk,rk2,col1,col2,col3\n" +
+          "-------------------------------\n" +
+          "021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      htable.close();
+      hAdmin.close();
+    }
+  }
+
+  private void assertIndexPredication(boolean isCompositeRowKey) throws 
Exception {
+    String postFix = isCompositeRowKey ? "_" + new String(new 
char[]{Character.MAX_VALUE}) : "";
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    ScanNode scanNode = new ScanNode(1);
+
+    // where rk = '021'
+    EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new 
FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("021")));
+    scanNode.setQual(evalNodeEq);
+    StorageManager storageManager = StorageManager.getStorageManager(conf, 
StoreType.HBASE);
+    List<Fragment> fragments = storageManager.getSplits("hbase_mapped_table", 
tableDesc, scanNode);
+    assertEquals(1, fragments.size());
+    assertEquals("021", new 
String(((HBaseFragment)fragments.get(0)).getStartRow()));
+    assertEquals("021" + postFix, new 
String(((HBaseFragment)fragments.get(0)).getStopRow()));
+
+    // where rk >= '020' and rk <= '055'
+    EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new 
FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("020")));
+    EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new 
FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("055")));
+    EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
+    scanNode.setQual(evalNodeA);
+
+    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, 
scanNode);
+    assertEquals(2, fragments.size());
+    HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    HBaseFragment fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    // where (rk >= '020' and rk <= '055') or rk = '075'
+    EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new 
FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("075")));
+    EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
+    scanNode.setQual(evalNodeB);
+    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, 
scanNode);
+    assertEquals(3, fragments.size());
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    HBaseFragment fragment3 = (HBaseFragment) fragments.get(2);
+    assertEquals("075", new String(fragment3.getStartRow()));
+    assertEquals("075" + postFix, new String(fragment3.getStopRow()));
+
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
+    EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new 
FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("072")));
+    EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new 
FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("078")));
+    EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+    scanNode.setQual(evalNodeD);
+    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, 
scanNode);
+    assertEquals(3, fragments.size());
+
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    fragment3 = (HBaseFragment) fragments.get(2);
+    assertEquals("072", new String(fragment3.getStartRow()));
+    assertEquals("078" + postFix, new String(fragment3.getStopRow()));
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059')
+    evalNode4 = new BinaryEval(EvalType.GEQ, new 
FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("057")));
+    evalNode5 = new BinaryEval(EvalType.LEQ, new 
FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("059")));
+    evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+    scanNode.setQual(evalNodeD);
+    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, 
scanNode);
+    assertEquals(2, fragments.size());
+
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("059" + postFix, new String(fragment2.getStopRow()));
+  }
+
+  @Test
+  public void testNonForwardQuery() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 
text, col3 int) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col1:a,col2:,col3:#b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin =  new 
HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    HTable htable = null;
+    try {
+      hAdmin.tableExists("hbase_table");
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = 
htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from hbase_mapped_table");
+      assertResultSet(res);
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      hAdmin.close();
+      if (htable == null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 
text, col3 int8) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col1:a,col2:,col3:b#b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin =  new 
HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    HTable htable = null;
+    try {
+      hAdmin.tableExists("hbase_table");
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = 
htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes((long) i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select a.rk, a.col1, a.col2, a.col3, 
b.l_orderkey, b.l_linestatus " +
+          "from hbase_mapped_table a " +
+          "join default.lineitem b on a.col3 = b.l_orderkey");
+      assertResultSet(res);
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      hAdmin.close();
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertInto() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 
text, col3 int4) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col1:a,col2:,col3:b#b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    executeString("insert into hbase_mapped_table " +
+        "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from 
default.lineitem ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), 
Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegion() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegion2() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegionWithSplitFile() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    String splitFilePath = currentDatasetPath + "/splits.data";
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys.file'='" + splitFilePath + "', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegionMultiRowFields() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 
text) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'='0:key,1:key,col1:a', " +
+        "'hbase.split.rowkeys'='001,002,003,004,005,006,007,008,009', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id1", Type.TEXT);
+    schema.addColumn("id2", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    DecimalFormat df = new DecimalFormat("000");
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|" + (i + 100) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id1, id2, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, null, Bytes.toBytes("col1")},
+          new byte[][]{null, null, Bytes.toBytes("a")},
+          new boolean[]{false, false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoBinaryMultiRegion() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key#b,col1:a', " 
+
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{true, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoColumnKeyValue() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col2_key text, 
col2_value text, col3 text) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col2:key:,col2:value:,col3:', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("rk", Type.TEXT);
+    schema.addColumn("col2_key", Type.TEXT);
+    schema.addColumn("col2_value", Type.TEXT);
+    schema.addColumn("col3", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 20; i >= 0; i--) {
+      for (int j = 0; j < 3; j++) {
+        datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i);
+      }
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select rk, col2_key, col2_value, col3 from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, null, null},
+          new boolean[]{false, false, false}, tableDesc.getSchema()));
+
+      ResultSet res = executeString("select * from hbase_mapped_table");
+
+      String expected = "rk,col2_key,col2_value,col3\n" +
+          "-------------------------------\n" +
+          "0,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-0\n" +
+          "1,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-1\n" +
+          "10,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-10\n" +
+          "11,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-11\n" +
+          "12,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-12\n" +
+          "13,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-13\n" +
+          "14,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-14\n" +
+          "15,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-15\n" +
+          "16,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-16\n" +
+          "17,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-17\n" +
+          "18,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-18\n" +
+          "19,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-19\n" +
+          "2,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-2\n" +
+          "20,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-20\n" +
+          "3,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-3\n" +
+          "4,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-4\n" +
+          "5,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-5\n" +
+          "6,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-6\n" +
+          "7,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-7\n" +
+          "8,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-8\n" +
+          "9,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", 
\"value-2\"],col3-9\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoDifferentType() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    try {
+      executeString("insert into hbase_mapped_table " +
+          "select id, name from base_table ").close();
+      fail("If inserting data type different with target table data type, 
should throw exception");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("VerifyException") >= 0);
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+    }
+  }
+
+  @Test
+  public void testInsertIntoRowField() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 
text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    executeString("insert into hbase_mapped_table " +
+        "select l_orderkey::text, l_partkey::text, l_shipdate, l_returnflag, 
l_suppkey::text from default.lineitem ");
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), 
Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), Bytes.toBytes(""), 
Bytes.toBytes("b")},
+          new boolean[]{false, false, false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testCATS() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" +
+        " as " +
+        "select id, name from base_table"
+    ).close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoUsingPut() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 
text, col3 int4) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col1:a,col2:,col3:b#b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), 
"hbase_mapped_table");
+
+    Map<String, String> sessions = new HashMap<String, String>();
+    sessions.put(HBaseStorageConstants.INSERT_PUT_MODE, "true");
+    client.updateSessionVariables(sessions);
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      executeString("insert into hbase_mapped_table " +
+          "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from 
default.lineitem ").close();
+
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), 
"hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      // result is dirrerent with testInsertInto because l_orderkey is not 
unique.
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), 
Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      
client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE));
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoLocation() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = 
testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 
text) " +
+        "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col1:a,col2:', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + 
"')").close();
+
+    assertTableExists("hbase_mapped_table");
+
+    try {
+      // create test table
+      KeyValueSet tableOptions = new KeyValueSet();
+      tableOptions.set(StorageConstants.CSVFILE_DELIMITER, 
StorageConstants.DEFAULT_FIELD_DELIMITER);
+      tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.TEXT);
+      schema.addColumn("name", Type.TEXT);
+      schema.addColumn("comment", Type.TEXT);
+      List<String> datas = new ArrayList<String>();
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 99; i >= 0; i--) {
+        datas.add(df.format(i) + "|value" + i + "|comment-" + i);
+      }
+      TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+          schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+      executeString("insert into location '/tmp/hfile_test' " +
+          "USING hbase WITH ('table'='hbase_table', 
'columns'=':key,col1:a,col2:', " +
+          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" +
+          "select id, name, comment from base_table ").close();
+
+      FileSystem fs = testingCluster.getDefaultFileSystem();
+      Path path = new Path("/tmp/hfile_test");
+      assertTrue(fs.exists(path));
+
+      FileStatus[] files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(2, files.length);
+
+      assertEquals("/tmp/hfile_test/col2", 
files[1].getPath().toUri().getPath());
+
+      int index = 1;
+      for (FileStatus eachFile: files) {
+        assertEquals("/tmp/hfile_test/col" + index, 
eachFile.getPath().toUri().getPath());
+        for (FileStatus subFile: fs.listStatus(eachFile.getPath())) {
+          assertTrue(subFile.isFile());
+          assertTrue(subFile.getLen() > 0);
+        }
+        index++;
+      }
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+    }
+  }
+
+  private String resultSetToString(ResultScanner scanner,
+                                   byte[][] cfNames, byte[][] qualifiers,
+                                   boolean[] binaries,
+                                   Schema schema) throws Exception {
+    StringBuilder sb = new StringBuilder();
+    Result result = null;
+    while ( (result = scanner.next()) != null ) {
+      if (binaries[0]) {
+        
sb.append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(0), 
result.getRow()).asChar());
+      } else {
+        sb.append(new String(result.getRow()));
+      }
+
+      for (int i = 0; i < cfNames.length; i++) {
+        if (cfNames[i] == null) {
+          //rowkey
+          continue;
+        }
+        if (qualifiers[i] == null) {
+          Map<byte[], byte[]> values = result.getFamilyMap(cfNames[i]);
+          if (values == null) {
+            sb.append(", null");
+          } else {
+            sb.append(", {");
+            String delim = "";
+            for (Map.Entry<byte[], byte[]> valueEntry: values.entrySet()) {
+              byte[] keyBytes = valueEntry.getKey();
+              byte[] valueBytes = valueEntry.getValue();
+
+              if (binaries[i]) {
+                sb.append(delim).append("\"").append(keyBytes == null ? "" : 
Bytes.toLong(keyBytes)).append("\"");
+                sb.append(": 
\"").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), 
valueBytes)).append("\"");
+              } else {
+                sb.append(delim).append("\"").append(keyBytes == null ? "" : 
new String(keyBytes)).append("\"");
+                sb.append(": 
\"").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), 
valueBytes)).append("\"");
+              }
+              delim = ", ";
+            }
+            sb.append("}");
+          }
+        } else {
+          byte[] value = result.getValue(cfNames[i], qualifiers[i]);
+          if (value == null) {
+            sb.append(", null");
+          } else {
+            if (binaries[i]) {
+              sb.append(", 
").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), 
value));
+            } else {
+              sb.append(", 
").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), 
value));
+            }
+          }
+        }
+      }
+      sb.append("\n");
+    }
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index df4b25c..a16d9f0 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -120,7 +120,7 @@ public class TestRangeRetrieverHandler {
 
     Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
     fs.mkdirs(tableDir.getParent());
-    Appender appender = sm.getAppender(employeeMeta, schema, tableDir);
+    Appender appender = ((FileStorageManager)sm).getAppender(employeeMeta, 
schema, tableDir);
     appender.init();
 
     Tuple tuple = new VTuple(schema.size());
@@ -245,7 +245,7 @@ public class TestRangeRetrieverHandler {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path tablePath = StorageUtil.concatPath(testDir, 
"testGetFromDescendingOrder", "table.csv");
     fs.mkdirs(tablePath.getParent());
-    Appender appender = sm.getAppender(meta, schema, tablePath);
+    Appender appender = ((FileStorageManager)sm).getAppender(meta, schema, 
tablePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data 
b/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data
new file mode 100644
index 0000000..417d480
--- /dev/null
+++ b/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data
@@ -0,0 +1,4 @@
+010
+040
+060
+080
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result
 
b/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result
new file mode 100644
index 0000000..8d50bf1
--- /dev/null
+++ 
b/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result
@@ -0,0 +1,81 @@
+rk,col1,col2,col3
+-------------------------------
+21,a-21,{"k1":"k1-21", "k2":"k2-21"},21
+22,a-22,{"k1":"k1-22", "k2":"k2-22"},22
+23,a-23,{"k1":"k1-23", "k2":"k2-23"},23
+24,a-24,{"k1":"k1-24", "k2":"k2-24"},24
+25,a-25,{"k1":"k1-25", "k2":"k2-25"},25
+26,a-26,{"k1":"k1-26", "k2":"k2-26"},26
+27,a-27,{"k1":"k1-27", "k2":"k2-27"},27
+28,a-28,{"k1":"k1-28", "k2":"k2-28"},28
+29,a-29,{"k1":"k1-29", "k2":"k2-29"},29
+30,a-30,{"k1":"k1-30", "k2":"k2-30"},30
+31,a-31,{"k1":"k1-31", "k2":"k2-31"},31
+32,a-32,{"k1":"k1-32", "k2":"k2-32"},32
+33,a-33,{"k1":"k1-33", "k2":"k2-33"},33
+34,a-34,{"k1":"k1-34", "k2":"k2-34"},34
+35,a-35,{"k1":"k1-35", "k2":"k2-35"},35
+36,a-36,{"k1":"k1-36", "k2":"k2-36"},36
+37,a-37,{"k1":"k1-37", "k2":"k2-37"},37
+38,a-38,{"k1":"k1-38", "k2":"k2-38"},38
+39,a-39,{"k1":"k1-39", "k2":"k2-39"},39
+40,a-40,{"k1":"k1-40", "k2":"k2-40"},40
+41,a-41,{"k1":"k1-41", "k2":"k2-41"},41
+42,a-42,{"k1":"k1-42", "k2":"k2-42"},42
+43,a-43,{"k1":"k1-43", "k2":"k2-43"},43
+44,a-44,{"k1":"k1-44", "k2":"k2-44"},44
+45,a-45,{"k1":"k1-45", "k2":"k2-45"},45
+46,a-46,{"k1":"k1-46", "k2":"k2-46"},46
+47,a-47,{"k1":"k1-47", "k2":"k2-47"},47
+48,a-48,{"k1":"k1-48", "k2":"k2-48"},48
+49,a-49,{"k1":"k1-49", "k2":"k2-49"},49
+50,a-50,{"k1":"k1-50", "k2":"k2-50"},50
+51,a-51,{"k1":"k1-51", "k2":"k2-51"},51
+52,a-52,{"k1":"k1-52", "k2":"k2-52"},52
+53,a-53,{"k1":"k1-53", "k2":"k2-53"},53
+54,a-54,{"k1":"k1-54", "k2":"k2-54"},54
+55,a-55,{"k1":"k1-55", "k2":"k2-55"},55
+56,a-56,{"k1":"k1-56", "k2":"k2-56"},56
+57,a-57,{"k1":"k1-57", "k2":"k2-57"},57
+58,a-58,{"k1":"k1-58", "k2":"k2-58"},58
+59,a-59,{"k1":"k1-59", "k2":"k2-59"},59
+60,a-60,{"k1":"k1-60", "k2":"k2-60"},60
+61,a-61,{"k1":"k1-61", "k2":"k2-61"},61
+62,a-62,{"k1":"k1-62", "k2":"k2-62"},62
+63,a-63,{"k1":"k1-63", "k2":"k2-63"},63
+64,a-64,{"k1":"k1-64", "k2":"k2-64"},64
+65,a-65,{"k1":"k1-65", "k2":"k2-65"},65
+66,a-66,{"k1":"k1-66", "k2":"k2-66"},66
+67,a-67,{"k1":"k1-67", "k2":"k2-67"},67
+68,a-68,{"k1":"k1-68", "k2":"k2-68"},68
+69,a-69,{"k1":"k1-69", "k2":"k2-69"},69
+70,a-70,{"k1":"k1-70", "k2":"k2-70"},70
+71,a-71,{"k1":"k1-71", "k2":"k2-71"},71
+72,a-72,{"k1":"k1-72", "k2":"k2-72"},72
+73,a-73,{"k1":"k1-73", "k2":"k2-73"},73
+74,a-74,{"k1":"k1-74", "k2":"k2-74"},74
+75,a-75,{"k1":"k1-75", "k2":"k2-75"},75
+76,a-76,{"k1":"k1-76", "k2":"k2-76"},76
+77,a-77,{"k1":"k1-77", "k2":"k2-77"},77
+78,a-78,{"k1":"k1-78", "k2":"k2-78"},78
+79,a-79,{"k1":"k1-79", "k2":"k2-79"},79
+80,a-80,{"k1":"k1-80", "k2":"k2-80"},80
+81,a-81,{"k1":"k1-81", "k2":"k2-81"},81
+82,a-82,{"k1":"k1-82", "k2":"k2-82"},82
+83,a-83,{"k1":"k1-83", "k2":"k2-83"},83
+84,a-84,{"k1":"k1-84", "k2":"k2-84"},84
+85,a-85,{"k1":"k1-85", "k2":"k2-85"},85
+86,a-86,{"k1":"k1-86", "k2":"k2-86"},86
+87,a-87,{"k1":"k1-87", "k2":"k2-87"},87
+88,a-88,{"k1":"k1-88", "k2":"k2-88"},88
+89,a-89,{"k1":"k1-89", "k2":"k2-89"},89
+90,a-90,{"k1":"k1-90", "k2":"k2-90"},90
+91,a-91,{"k1":"k1-91", "k2":"k2-91"},91
+92,a-92,{"k1":"k1-92", "k2":"k2-92"},92
+93,a-93,{"k1":"k1-93", "k2":"k2-93"},93
+94,a-94,{"k1":"k1-94", "k2":"k2-94"},94
+95,a-95,{"k1":"k1-95", "k2":"k2-95"},95
+96,a-96,{"k1":"k1-96", "k2":"k2-96"},96
+97,a-97,{"k1":"k1-97", "k2":"k2-97"},97
+98,a-98,{"k1":"k1-98", "k2":"k2-98"},98
+99,a-99,{"k1":"k1-99", "k2":"k2-99"},99
\ No newline at end of file

Reply via email to