http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 00edc79..7ac4cc5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.catalog.Schema; @@ -54,6 +53,7 @@ import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; @@ -107,39 +107,6 @@ public class Task { private Schema finalSchema = null; private TupleComparator sortComp = null; - static final String OUTPUT_FILE_PREFIX="part-"; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(2); - return fmt; - } - }; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(6); - return fmt; - } - }; - - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - return fmt; - } - }; - public Task(String taskRunnerId, Path baseDir, QueryUnitAttemptId taskId, @@ -189,13 +156,8 @@ public class Task { this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { - // The final result of a task will be written in a file named part-ss-nnnnnnn, - // where ss is the subquery id associated with this task, and nnnnnn is the task id. - Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME, - OUTPUT_FILE_PREFIX + - OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" + - OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()) + "-" + - OUTPUT_FILE_FORMAT_SEQ.get().format(0)); + Path outFilePath = StorageManager.getFileStorageManager(systemConf).getAppenderFilePath( + taskId, queryContext.getStagingDir()); LOG.info("Output File Path: " + outFilePath); context.setOutputPath(outFilePath); }
http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/resources/webapps/worker/queryunit.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp index 18a67d8..49635d1 100644 --- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp +++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp @@ -41,6 +41,7 @@ <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.Map" %> <%@ page import="java.util.Set" %> +<%@ page import="org.apache.tajo.storage.fragment.Fragment" %> <% String paramQueryId = request.getParameter("queryId"); @@ -102,8 +103,8 @@ String fragmentInfo = ""; String delim = ""; for (CatalogProtos.FragmentProto eachFragment : queryUnit.getAllFragments()) { - FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment); - fragmentInfo += delim + fileFragment.toString(); + Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), eachFragment); + fragmentInfo += delim + fragment.toString(); delim = "<br/>"; } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java new file mode 100644 index 0000000..a8e4a5c --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.tajo.util.Bytes; + +import java.io.File; +import java.io.IOException; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; + +public class HBaseTestClusterUtil { + private static final Log LOG = LogFactory.getLog(HBaseTestClusterUtil.class); + private Configuration conf; + private MiniHBaseCluster hbaseCluster; + private MiniZooKeeperCluster zkCluster; + private File testBaseDir; + public HBaseTestClusterUtil(Configuration conf, File testBaseDir) { + this.conf = conf; + this.testBaseDir = testBaseDir; + } + /** + * Returns the path to the default root dir the minicluster uses. + * Note: this does not cause the root dir to be created. + * @return Fully qualified path for the default hbase root dir + * @throws java.io.IOException + */ + public Path getDefaultRootDirPath() throws IOException { + FileSystem fs = FileSystem.get(this.conf); + return new Path(fs.makeQualified(fs.getHomeDirectory()),"hbase"); + } + + /** + * Creates an hbase rootdir in user home directory. Also creates hbase + * version file. Normally you won't make use of this method. Root hbasedir + * is created for you as part of mini cluster startup. You'd only use this + * method if you were doing manual operation. + * @return Fully qualified path to hbase root dir + * @throws java.io.IOException + */ + public Path createRootDir() throws IOException { + FileSystem fs = FileSystem.get(this.conf); + Path hbaseRootdir = getDefaultRootDirPath(); + FSUtils.setRootDir(this.conf, hbaseRootdir); + fs.mkdirs(hbaseRootdir); + FSUtils.setVersion(fs, hbaseRootdir); + return hbaseRootdir; + } + + public void stopHBaseCluster() throws IOException { + if (hbaseCluster != null) { + LOG.info("MiniHBaseCluster stopped"); + hbaseCluster.shutdown(); + hbaseCluster.waitUntilShutDown(); + hbaseCluster = null; + } + } + + public void startHBaseCluster() throws Exception { + if (zkCluster == null) { + startMiniZKCluster(); + } + if (hbaseCluster != null) { + return; + } + + System.setProperty("HBASE_ZNODE_FILE", testBaseDir + "/hbase_znode_file"); + if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); + } + if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); + } + conf.setBoolean(REPLICATION_ENABLE_KEY, false); + createRootDir(); + + Configuration c = HBaseConfiguration.create(this.conf); + + hbaseCluster = new MiniHBaseCluster(c, 1); + + // Don't leave here till we've done a successful scan of the hbase:meta + HTable t = new HTable(c, TableName.META_TABLE_NAME); + ResultScanner s = t.getScanner(new Scan()); + while (s.next() != null) { + continue; + } + s.close(); + t.close(); + LOG.info("MiniHBaseCluster started"); + + } + + /** + * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set + * the port mentionned is used as the default port for ZooKeeper. + */ + public MiniZooKeeperCluster startMiniZKCluster() + throws Exception { + File zkDataPath = new File(testBaseDir, "zk"); + if (this.zkCluster != null) { + throw new IOException("Cluster already running at " + zkDataPath); + } + this.zkCluster = new MiniZooKeeperCluster(conf); + final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0); + if (defPort > 0){ + // If there is a port in the config file, we use it. + this.zkCluster.setDefaultClientPort(defPort); + } + int clientPort = this.zkCluster.startup(zkDataPath, 1); + this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort)); + LOG.info("MiniZooKeeperCluster started"); + + return this.zkCluster; + } + + public void stopZooKeeperCluster() throws IOException { + if (zkCluster != null) { + LOG.info("MiniZooKeeperCluster stopped"); + zkCluster.shutdown(); + zkCluster = null; + } + } + + public Configuration getConf() { + return conf; + } + + public MiniZooKeeperCluster getMiniZooKeeperCluster() { + return zkCluster; + } + + public MiniHBaseCluster getMiniHBaseCluster() { + return hbaseCluster; + } + + public HTableDescriptor getTableDescriptor(String tableName) throws IOException { + HBaseAdmin admin = new HBaseAdmin(conf); + try { + return admin.getTableDescriptor(Bytes.toBytes(tableName)); + } finally { + admin.close(); + } + } + + public void createTable(HTableDescriptor hTableDesc) throws IOException { + HBaseAdmin admin = new HBaseAdmin(conf); + try { + admin.createTable(hTableDesc); + } finally { + admin.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 604a56b..373774e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -169,9 +169,9 @@ public class QueryTestCaseBase { private static String currentDatabase; private static Set<String> createdTableGlobalSet = new HashSet<String>(); // queries and results directory corresponding to subclass class. - private Path currentQueryPath; - private Path currentResultPath; - private Path currentDatasetPath; + protected Path currentQueryPath; + protected Path currentResultPath; + protected Path currentDatasetPath; // for getting a method name @Rule public TestName name = new TestName(); @@ -303,7 +303,7 @@ public class QueryTestCaseBase { return executeFile(getMethodName() + ".sql"); } - private String getMethodName() { + protected String getMethodName() { String methodName = name.getMethodName(); // In the case of parameter execution name's pattern is methodName[0] if (methodName.endsWith("]")) { http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 8ddc264..d5f1a67 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -69,6 +69,7 @@ public class TajoTestingCluster { private FileSystem defaultFS; private MiniDFSCluster dfsCluster; private MiniCatalogServer catalogServer; + private HBaseTestClusterUtil hbaseUtil; private TajoMaster tajoMaster; private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>(); @@ -259,6 +260,10 @@ public class TajoTestingCluster { return this.defaultFS; } + public HBaseTestClusterUtil getHBaseUtil() { + return hbaseUtil; + } + //////////////////////////////////////////////////////// // Catalog Section //////////////////////////////////////////////////////// @@ -482,6 +487,8 @@ public class TajoTestingCluster { startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts); this.dfsCluster.waitClusterUp(); + hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir); + if(!standbyWorkerMode) { startMiniYarnCluster(); } @@ -569,7 +576,6 @@ public class TajoTestingCluster { } if(this.dfsCluster != null) { - try { FileSystem fs = this.dfsCluster.getFileSystem(); if (fs != null) fs.close(); @@ -588,6 +594,10 @@ public class TajoTestingCluster { } this.clusterTestBuildDir = null; } + + hbaseUtil.stopZooKeeperCluster(); + hbaseUtil.stopHBaseCluster(); + LOG.info("Minicluster is down"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java index 7f402a1..e331599 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java @@ -25,6 +25,7 @@ import org.apache.tajo.engine.eval.ExprTestBase; import org.junit.Test; import java.io.IOException; +import java.text.DecimalFormat; import static org.apache.tajo.common.TajoDataTypes.Type.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java index 5cc89b8..fee6f4c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java @@ -380,7 +380,7 @@ public class TestPlannerUtil { int index = 0; for (int i = startIndex; i < startIndex + expectedSize; i++, index++) { - FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), StoreType.CSV, fragments[index]); + FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), fragments[index]); assertEquals(expectedFiles.get(i), fragment.getPath()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java index 2c02405..aa36ea7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -196,8 +196,7 @@ public class TestBSTIndexExec { Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()), "Error: There is no table matched to %s", scanNode.getTableName()); - List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), meta.getStoreType(), - ctx.getTables(scanNode.getTableName())); + List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName())); Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)}; http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index c86c4c7..06f4001 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -48,6 +48,7 @@ import org.apache.tajo.master.session.Session; import org.apache.tajo.plan.*; import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.PlanProto.ShuffleType; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; @@ -76,7 +77,6 @@ import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm; -import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; import static org.junit.Assert.*; public class TestPhysicalPlanner { @@ -441,7 +441,6 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(defaultContext, context); LogicalNode rootNode = optimizer.optimize(plan); - TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java new file mode 100644 index 0000000..cb9aa74 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -0,0 +1,1474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.InclusiveStopFilter; +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.hbase.*; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.net.InetAddress; +import java.sql.ResultSet; +import java.text.DecimalFormat; +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; + +@Category(IntegrationTest.class) +public class TestHBaseTable extends QueryTestCaseBase { + private static final Log LOG = LogFactory.getLog(TestHBaseTable.class); + + @BeforeClass + public static void beforeClass() { + try { + testingCluster.getHBaseUtil().startHBaseCluster(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @AfterClass + public static void afterClass() { + try { + testingCluster.getHBaseUtil().stopHBaseCluster(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testVerifyCreateHBaseTableRequiredMeta() throws Exception { + try { + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " + + "USING hbase").close(); + + fail("hbase table must have 'table' meta"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0); + } + + try { + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " + + "USING hbase " + + "WITH ('table'='hbase_table')").close(); + + fail("hbase table must have 'columns' meta"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("'columns' property is required") >= 0); + } + + try { + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " + + "USING hbase " + + "WITH ('table'='hbase_table', 'columns'='col1:,col2:')").close(); + + fail("hbase table must have 'hbase.zookeeper.quorum' meta"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0); + } + } + + @Test + public void testCreateHBaseTable() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table1"); + + HTableDescriptor hTableDesc = testingCluster.getHBaseUtil().getTableDescriptor("hbase_table"); + assertNotNull(hTableDesc); + assertEquals("hbase_table", hTableDesc.getNameAsString()); + + HColumnDescriptor[] hColumns = hTableDesc.getColumnFamilies(); + // col1 is mapped to rowkey + assertEquals(2, hColumns.length); + assertEquals("col2", hColumns[0].getNameAsString()); + assertEquals("col3", hColumns[1].getNameAsString()); + + executeString("DROP TABLE hbase_mapped_table1 PURGE").close(); + + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + try { + assertFalse(hAdmin.tableExists("hbase_table")); + } finally { + hAdmin.close(); + } + } + + @Test + public void testCreateNotExistsExternalHBaseTable() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + try { + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + fail("External table should be a existed table."); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("External table should be a existed table.") >= 0); + } + } + + @Test + public void testCreateRowFieldWithNonText() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + try { + executeString("CREATE TABLE hbase_mapped_table2 (rk1 int4, rk2 text, col3 text, col4 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key#b,1:key,col3:,col2:b', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + fail("Key field type should be TEXT type"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("Key field type should be TEXT type") >= 0); + } + } + + @Test + public void testCreateExternalHBaseTable() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table_not_purge")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table_not_purge', 'columns'=':key,col1:a,col2:,col3:b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + executeString("DROP TABLE external_hbase_mapped_table").close(); + + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + try { + assertTrue(hAdmin.tableExists("external_hbase_table_not_purge")); + hAdmin.disableTable("external_hbase_table_not_purge"); + hAdmin.deleteTable("external_hbase_table_not_purge"); + } finally { + hAdmin.close(); + } + } + + @Test + public void testSimpleSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(testingCluster.getHBaseUtil().getConf()); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(i).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testBinaryMappedQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, col1 text, col2 text, col3 int4)\n " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key#b,col1:a,col2:,col3:b#b', \n" + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "', \n" + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(testingCluster.getHBaseUtil().getConf()); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(Bytes.toBytes((long) i)); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes(i)); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk > 20"); + assertResultSet(res); + res.close(); + + //Projection + res = executeString("select col3, col2, rk from external_hbase_mapped_table where rk > 95"); + + String expected = "col3,col2,rk\n" + + "-------------------------------\n" + + "96,{\"k1\":\"k1-96\", \"k2\":\"k2-96\"},96\n" + + "97,{\"k1\":\"k1-97\", \"k2\":\"k2-97\"},97\n" + + "98,{\"k1\":\"k1-98\", \"k2\":\"k2-98\"},98\n" + + "99,{\"k1\":\"k1-99\", \"k2\":\"k2-99\"},99\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testColumnKeyValueSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, col2_key text, col2_value text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(testingCluster.getHBaseUtil().getConf()); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("rk-" + i)); + for (int j = 0; j < 5; j++) { + put.add("col2".getBytes(), ("key-" + j).getBytes(), Bytes.toBytes("value-" + j)); + } + put.add("col3".getBytes(), "".getBytes(), ("col3-value-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 >= 'rk-0'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testRowFieldSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, rk2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'='0:key,1:key,col3:a', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(testingCluster.getHBaseUtil().getConf()); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(("field1-" + i + "_field2-" + i).getBytes()); + put.add("col3".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 > 'field1-20'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testIndexPredication() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + hAdmin.tableExists("hbase_table"); + + HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + try { + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + assertIndexPredication(false); + + ResultSet res = executeString("select * from hbase_mapped_table where rk >= '020' and rk <= '055'"); + assertResultSet(res); + res.close(); + + res = executeString("select * from hbase_mapped_table where rk = '021'"); + String expected = "rk,col1,col2,col3\n" + + "-------------------------------\n" + + "021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + htable.close(); + hAdmin.close(); + } + } + + @Test + public void testCompositeRowIndexPredication() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + hAdmin.tableExists("hbase_table"); + + HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + try { + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + + Scan scan = new Scan(); + scan.setStartRow("021".getBytes()); + scan.setStopRow(("021_" + new String(new char[]{Character.MAX_VALUE})).getBytes()); + Filter filter = new InclusiveStopFilter(scan.getStopRow()); + scan.setFilter(filter); + + ResultScanner scanner = htable.getScanner(scan); + Result result = scanner.next(); + assertNotNull(result); + assertEquals("021_021", new String(result.getRow())); + scanner.close(); + + assertIndexPredication(true); + + ResultSet res = executeString("select * from hbase_mapped_table where rk = '021'"); + String expected = "rk,rk2,col1,col2,col3\n" + + "-------------------------------\n" + + "021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + htable.close(); + hAdmin.close(); + } + } + + private void assertIndexPredication(boolean isCompositeRowKey) throws Exception { + String postFix = isCompositeRowKey ? "_" + new String(new char[]{Character.MAX_VALUE}) : ""; + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + ScanNode scanNode = new ScanNode(1); + + // where rk = '021' + EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("021"))); + scanNode.setQual(evalNodeEq); + StorageManager storageManager = StorageManager.getStorageManager(conf, StoreType.HBASE); + List<Fragment> fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(1, fragments.size()); + assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow())); + assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow())); + + // where rk >= '020' and rk <= '055' + EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("020"))); + EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("055"))); + EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); + scanNode.setQual(evalNodeA); + + fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(2, fragments.size()); + HBaseFragment fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + HBaseFragment fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + // where (rk >= '020' and rk <= '055') or rk = '075' + EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("075"))); + EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3); + scanNode.setQual(evalNodeB); + fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(3, fragments.size()); + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + HBaseFragment fragment3 = (HBaseFragment) fragments.get(2); + assertEquals("075", new String(fragment3.getStartRow())); + assertEquals("075" + postFix, new String(fragment3.getStopRow())); + + + // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078') + EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("072"))); + EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("078"))); + EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); + scanNode.setQual(evalNodeD); + fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(3, fragments.size()); + + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + fragment3 = (HBaseFragment) fragments.get(2); + assertEquals("072", new String(fragment3.getStartRow())); + assertEquals("078" + postFix, new String(fragment3.getStopRow())); + + // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059') + evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("057"))); + evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("059"))); + evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); + scanNode.setQual(evalNodeD); + fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(2, fragments.size()); + + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("059" + postFix, new String(fragment2.getStopRow())); + } + + @Test + public void testNonForwardQuery() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + HTable htable = null; + try { + hAdmin.tableExists("hbase_table"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i)); + htable.put(put); + } + + ResultSet res = executeString("select * from hbase_mapped_table"); + assertResultSet(res); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + hAdmin.close(); + if (htable == null) { + htable.close(); + } + } + } + + @Test + public void testJoin() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int8) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + HTable htable = null; + try { + hAdmin.tableExists("hbase_table"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes((long) i)); + htable.put(put); + } + + ResultSet res = executeString("select a.rk, a.col1, a.col2, a.col3, b.l_orderkey, b.l_linestatus " + + "from hbase_mapped_table a " + + "join default.lineitem b on a.col3 = b.l_orderkey"); + assertResultSet(res); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + hAdmin.close(); + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertInto() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + executeString("insert into hbase_mapped_table " + + "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")}, + new boolean[]{false, false, false, true}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegion() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegion2() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegionWithSplitFile() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + String splitFilePath = currentDatasetPath + "/splits.data"; + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys.file'='" + splitFilePath + "', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegionMultiRowFields() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a', " + + "'hbase.split.rowkeys'='001,002,003,004,005,006,007,008,009', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id1", Type.TEXT); + schema.addColumn("id2", Type.TEXT); + schema.addColumn("name", Type.TEXT); + DecimalFormat df = new DecimalFormat("000"); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|" + (i + 100) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id1, id2, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, null, Bytes.toBytes("col1")}, + new byte[][]{null, null, Bytes.toBytes("a")}, + new boolean[]{false, false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoBinaryMultiRegion() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key#b,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{true, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoColumnKeyValue() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col2_key text, col2_value text, col3 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("rk", Type.TEXT); + schema.addColumn("col2_key", Type.TEXT); + schema.addColumn("col2_value", Type.TEXT); + schema.addColumn("col3", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 20; i >= 0; i--) { + for (int j = 0; j < 3; j++) { + datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i); + } + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select rk, col2_key, col2_value, col3 from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, null, null}, + new boolean[]{false, false, false}, tableDesc.getSchema())); + + ResultSet res = executeString("select * from hbase_mapped_table"); + + String expected = "rk,col2_key,col2_value,col3\n" + + "-------------------------------\n" + + "0,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-0\n" + + "1,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-1\n" + + "10,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-10\n" + + "11,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-11\n" + + "12,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-12\n" + + "13,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-13\n" + + "14,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-14\n" + + "15,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-15\n" + + "16,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-16\n" + + "17,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-17\n" + + "18,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-18\n" + + "19,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-19\n" + + "2,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-2\n" + + "20,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-20\n" + + "3,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-3\n" + + "4,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-4\n" + + "5,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-5\n" + + "6,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-6\n" + + "7,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-7\n" + + "8,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-8\n" + + "9,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-9\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoDifferentType() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + try { + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + fail("If inserting data type different with target table data type, should throw exception"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("VerifyException") >= 0); + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + } + } + + @Test + public void testInsertIntoRowField() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + executeString("insert into hbase_mapped_table " + + "select l_orderkey::text, l_partkey::text, l_shipdate, l_returnflag, l_suppkey::text from default.lineitem "); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), Bytes.toBytes(""), Bytes.toBytes("b")}, + new boolean[]{false, false, false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testCATS() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" + + " as " + + "select id, name from base_table" + ).close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoUsingPut() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + Map<String, String> sessions = new HashMap<String, String>(); + sessions.put(HBaseStorageConstants.INSERT_PUT_MODE, "true"); + client.updateSessionVariables(sessions); + + HTable htable = null; + ResultScanner scanner = null; + try { + executeString("insert into hbase_mapped_table " + + "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close(); + + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + // result is dirrerent with testInsertInto because l_orderkey is not unique. + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")}, + new boolean[]{false, false, false, true}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE)); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoLocation() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + + try { + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + schema.addColumn("comment", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i + "|comment-" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into location '/tmp/hfile_test' " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" + + "select id, name, comment from base_table ").close(); + + FileSystem fs = testingCluster.getDefaultFileSystem(); + Path path = new Path("/tmp/hfile_test"); + assertTrue(fs.exists(path)); + + FileStatus[] files = fs.listStatus(path); + assertNotNull(files); + assertEquals(2, files.length); + + assertEquals("/tmp/hfile_test/col2", files[1].getPath().toUri().getPath()); + + int index = 1; + for (FileStatus eachFile: files) { + assertEquals("/tmp/hfile_test/col" + index, eachFile.getPath().toUri().getPath()); + for (FileStatus subFile: fs.listStatus(eachFile.getPath())) { + assertTrue(subFile.isFile()); + assertTrue(subFile.getLen() > 0); + } + index++; + } + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + } + } + + private String resultSetToString(ResultScanner scanner, + byte[][] cfNames, byte[][] qualifiers, + boolean[] binaries, + Schema schema) throws Exception { + StringBuilder sb = new StringBuilder(); + Result result = null; + while ( (result = scanner.next()) != null ) { + if (binaries[0]) { + sb.append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(0), result.getRow()).asChar()); + } else { + sb.append(new String(result.getRow())); + } + + for (int i = 0; i < cfNames.length; i++) { + if (cfNames[i] == null) { + //rowkey + continue; + } + if (qualifiers[i] == null) { + Map<byte[], byte[]> values = result.getFamilyMap(cfNames[i]); + if (values == null) { + sb.append(", null"); + } else { + sb.append(", {"); + String delim = ""; + for (Map.Entry<byte[], byte[]> valueEntry: values.entrySet()) { + byte[] keyBytes = valueEntry.getKey(); + byte[] valueBytes = valueEntry.getValue(); + + if (binaries[i]) { + sb.append(delim).append("\"").append(keyBytes == null ? "" : Bytes.toLong(keyBytes)).append("\""); + sb.append(": \"").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\""); + } else { + sb.append(delim).append("\"").append(keyBytes == null ? "" : new String(keyBytes)).append("\""); + sb.append(": \"").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\""); + } + delim = ", "; + } + sb.append("}"); + } + } else { + byte[] value = result.getValue(cfNames[i], qualifiers[i]); + if (value == null) { + sb.append(", null"); + } else { + if (binaries[i]) { + sb.append(", ").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), value)); + } else { + sb.append(", ").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), value)); + } + } + } + } + sb.append("\n"); + } + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java index df4b25c..a16d9f0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java @@ -120,7 +120,7 @@ public class TestRangeRetrieverHandler { Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv"); fs.mkdirs(tableDir.getParent()); - Appender appender = sm.getAppender(employeeMeta, schema, tableDir); + Appender appender = ((FileStorageManager)sm).getAppender(employeeMeta, schema, tableDir); appender.init(); Tuple tuple = new VTuple(schema.size()); @@ -245,7 +245,7 @@ public class TestRangeRetrieverHandler { TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv"); fs.mkdirs(tablePath.getParent()); - Appender appender = sm.getAppender(meta, schema, tablePath); + Appender appender = ((FileStorageManager)sm).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple = new VTuple(schema.size()); for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) { http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data b/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data new file mode 100644 index 0000000..417d480 --- /dev/null +++ b/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data @@ -0,0 +1,4 @@ +010 +040 +060 +080 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result b/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result new file mode 100644 index 0000000..8d50bf1 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result @@ -0,0 +1,81 @@ +rk,col1,col2,col3 +------------------------------- +21,a-21,{"k1":"k1-21", "k2":"k2-21"},21 +22,a-22,{"k1":"k1-22", "k2":"k2-22"},22 +23,a-23,{"k1":"k1-23", "k2":"k2-23"},23 +24,a-24,{"k1":"k1-24", "k2":"k2-24"},24 +25,a-25,{"k1":"k1-25", "k2":"k2-25"},25 +26,a-26,{"k1":"k1-26", "k2":"k2-26"},26 +27,a-27,{"k1":"k1-27", "k2":"k2-27"},27 +28,a-28,{"k1":"k1-28", "k2":"k2-28"},28 +29,a-29,{"k1":"k1-29", "k2":"k2-29"},29 +30,a-30,{"k1":"k1-30", "k2":"k2-30"},30 +31,a-31,{"k1":"k1-31", "k2":"k2-31"},31 +32,a-32,{"k1":"k1-32", "k2":"k2-32"},32 +33,a-33,{"k1":"k1-33", "k2":"k2-33"},33 +34,a-34,{"k1":"k1-34", "k2":"k2-34"},34 +35,a-35,{"k1":"k1-35", "k2":"k2-35"},35 +36,a-36,{"k1":"k1-36", "k2":"k2-36"},36 +37,a-37,{"k1":"k1-37", "k2":"k2-37"},37 +38,a-38,{"k1":"k1-38", "k2":"k2-38"},38 +39,a-39,{"k1":"k1-39", "k2":"k2-39"},39 +40,a-40,{"k1":"k1-40", "k2":"k2-40"},40 +41,a-41,{"k1":"k1-41", "k2":"k2-41"},41 +42,a-42,{"k1":"k1-42", "k2":"k2-42"},42 +43,a-43,{"k1":"k1-43", "k2":"k2-43"},43 +44,a-44,{"k1":"k1-44", "k2":"k2-44"},44 +45,a-45,{"k1":"k1-45", "k2":"k2-45"},45 +46,a-46,{"k1":"k1-46", "k2":"k2-46"},46 +47,a-47,{"k1":"k1-47", "k2":"k2-47"},47 +48,a-48,{"k1":"k1-48", "k2":"k2-48"},48 +49,a-49,{"k1":"k1-49", "k2":"k2-49"},49 +50,a-50,{"k1":"k1-50", "k2":"k2-50"},50 +51,a-51,{"k1":"k1-51", "k2":"k2-51"},51 +52,a-52,{"k1":"k1-52", "k2":"k2-52"},52 +53,a-53,{"k1":"k1-53", "k2":"k2-53"},53 +54,a-54,{"k1":"k1-54", "k2":"k2-54"},54 +55,a-55,{"k1":"k1-55", "k2":"k2-55"},55 +56,a-56,{"k1":"k1-56", "k2":"k2-56"},56 +57,a-57,{"k1":"k1-57", "k2":"k2-57"},57 +58,a-58,{"k1":"k1-58", "k2":"k2-58"},58 +59,a-59,{"k1":"k1-59", "k2":"k2-59"},59 +60,a-60,{"k1":"k1-60", "k2":"k2-60"},60 +61,a-61,{"k1":"k1-61", "k2":"k2-61"},61 +62,a-62,{"k1":"k1-62", "k2":"k2-62"},62 +63,a-63,{"k1":"k1-63", "k2":"k2-63"},63 +64,a-64,{"k1":"k1-64", "k2":"k2-64"},64 +65,a-65,{"k1":"k1-65", "k2":"k2-65"},65 +66,a-66,{"k1":"k1-66", "k2":"k2-66"},66 +67,a-67,{"k1":"k1-67", "k2":"k2-67"},67 +68,a-68,{"k1":"k1-68", "k2":"k2-68"},68 +69,a-69,{"k1":"k1-69", "k2":"k2-69"},69 +70,a-70,{"k1":"k1-70", "k2":"k2-70"},70 +71,a-71,{"k1":"k1-71", "k2":"k2-71"},71 +72,a-72,{"k1":"k1-72", "k2":"k2-72"},72 +73,a-73,{"k1":"k1-73", "k2":"k2-73"},73 +74,a-74,{"k1":"k1-74", "k2":"k2-74"},74 +75,a-75,{"k1":"k1-75", "k2":"k2-75"},75 +76,a-76,{"k1":"k1-76", "k2":"k2-76"},76 +77,a-77,{"k1":"k1-77", "k2":"k2-77"},77 +78,a-78,{"k1":"k1-78", "k2":"k2-78"},78 +79,a-79,{"k1":"k1-79", "k2":"k2-79"},79 +80,a-80,{"k1":"k1-80", "k2":"k2-80"},80 +81,a-81,{"k1":"k1-81", "k2":"k2-81"},81 +82,a-82,{"k1":"k1-82", "k2":"k2-82"},82 +83,a-83,{"k1":"k1-83", "k2":"k2-83"},83 +84,a-84,{"k1":"k1-84", "k2":"k2-84"},84 +85,a-85,{"k1":"k1-85", "k2":"k2-85"},85 +86,a-86,{"k1":"k1-86", "k2":"k2-86"},86 +87,a-87,{"k1":"k1-87", "k2":"k2-87"},87 +88,a-88,{"k1":"k1-88", "k2":"k2-88"},88 +89,a-89,{"k1":"k1-89", "k2":"k2-89"},89 +90,a-90,{"k1":"k1-90", "k2":"k2-90"},90 +91,a-91,{"k1":"k1-91", "k2":"k2-91"},91 +92,a-92,{"k1":"k1-92", "k2":"k2-92"},92 +93,a-93,{"k1":"k1-93", "k2":"k2-93"},93 +94,a-94,{"k1":"k1-94", "k2":"k2-94"},94 +95,a-95,{"k1":"k1-95", "k2":"k2-95"},95 +96,a-96,{"k1":"k1-96", "k2":"k2-96"},96 +97,a-97,{"k1":"k1-97", "k2":"k2-97"},97 +98,a-98,{"k1":"k1-98", "k2":"k2-98"},98 +99,a-99,{"k1":"k1-99", "k2":"k2-99"},99 \ No newline at end of file
