http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index bc3be04,07b47c1..b68eb2e --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@@ -438,28 -427,356 +434,28 @@@ public class Query implements EventHand return finalState; } - private boolean finalizeQuery(Query query, QueryCompletedEvent event) { + private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { - MasterPlan masterPlan = query.getPlan(); + SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId()); + StoreType storeType = lastStage.getTableMeta().getStoreType(); + try { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); + CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); - ExecutionBlock terminal = query.getPlan().getTerminalBlock(); - DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId()); + Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType) + .commitOutputData(query.context.getQueryContext(), + lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc); - QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); - try { - Path finalOutputDir = commitOutputData(query); + QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); - hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); - return true; + hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); - } catch (Throwable t) { - query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(t))); + } catch (Exception e) { - LOG.error(e.getMessage(), e); + query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); - return false; + return QueryState.QUERY_ERROR; } - ++ + return QueryState.QUERY_SUCCEEDED; } - /** - * It moves a result data stored in a staging output dir into a final output dir. - */ - public Path commitOutputData(Query query) throws IOException { - QueryContext queryContext = query.context.getQueryContext(); - Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; - if (queryContext.hasOutputPath()) { - finalOutputDir = queryContext.getOutputPath(); - try { - FileSystem fs = stagingResultDir.getFileSystem(query.systemConf); - - if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO - - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - - if (queryContext.hasPartition()) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map<Path, Path> renameDirs = TUtil.newHashMap(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map<Path, Path> recoveryDirs = TUtil.newHashMap(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } - - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } - - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } - - // Recovery renamed dirs - for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } - - throw new IOException(ioe.getMessage()); - } - } else { // no partition - try { - - // if the final output dir exists, move all contents to the temporary table dir. - // Otherwise, just make the final output dir. As a result, the final output dir will be empty. - if (fs.exists(finalOutputDir)) { - fs.mkdirs(oldTableDir); - - for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { - fs.rename(status.getPath(), oldTableDir); - } - - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir); - } - - // Move the results to the final output dir. - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - - // Check the final output dir - committed = fs.exists(finalOutputDir); - - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - - // if commit is failed, recover the old data - for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { - fs.delete(status.getPath(), true); - } - - for (FileStatus status : fs.listStatus(oldTableDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } - - throw new IOException(ioe.getMessage()); - } - } - } else { - NodeType queryType = queryContext.getCommandType(); - - if (queryType == NodeType.INSERT) { // INSERT INTO an existing table - - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - - if (queryContext.hasPartition()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - if (fs.exists(finalOutputDir)) { - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } else { - fs.rename(stagingResultDir, finalOutputDir); - } - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } - } - - // remove the staging directory if the final output dir is given. - Path stagingDirRoot = queryContext.getStagingDir().getParent(); - fs.delete(stagingDirRoot, true); - - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); - } - } else { - finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME); - } - - return finalOutputDir; - } - - /** - * This method sets a rename map which includes renamed staging directory to final output directory recursively. - * If there exists some data files, this delete it for duplicate data. - * - * - * @param fs - * @param stagingPath - * @param outputPath - * @param stagingParentPathString - * @throws IOException - */ - private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, - String stagingParentPathString, - Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - - for(FileStatus eachFile : files) { - if (eachFile.isDirectory()) { - Path oldPath = eachFile.getPath(); - - // Make recover directory. - String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, - oldTableDir.toString()); - Path recoveryPath = new Path(recoverPathString); - if (!fs.exists(recoveryPath)) { - fs.mkdirs(recoveryPath); - } - - visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, - renameDirs, oldTableDir); - // Find last order partition for renaming - String newPathString = oldPath.toString().replaceAll(stagingParentPathString, - outputPath.toString()); - Path newPath = new Path(newPathString); - if (!isLeafDirectory(fs, eachFile.getPath())) { - renameDirs.put(eachFile.getPath(), newPath); - } else { - if (!fs.exists(newPath)) { - fs.mkdirs(newPath); - } - } - } - } - } - - private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { - boolean retValue = false; - - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - if (fs.isDirectory(file.getPath())) { - retValue = true; - break; - } - } - - return retValue; - } - - private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - if (eachFile.isFile()) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - return false; - } else { - if (verifyAllFileMoved(fs, eachFile.getPath())) { - fs.delete(eachFile.getPath(), false); - } else { - return false; - } - } - } - } - - return true; - } - - /** - * Attach the sequence number to a path. - * - * @param path Path - * @param seq sequence number - * @param nf Number format - * @return New path attached with sequence number - * @throws IOException - */ - private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { - String[] tokens = path.getName().split("-"); - if (tokens.length != 4) { - throw new IOException("Wrong result file name:" + path); - } - return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); - } - - /** - * Attach the sequence number to the output file name and than move the file into the final result path. - * - * @param fs FileSystem - * @param stagingResultDir The staging result dir - * @param fileStatus The file status - * @param finalOutputPath Final output path - * @param nf Number format - * @param fileSeq The sequence number - * @throws IOException - */ - private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, - FileStatus fileStatus, Path finalOutputPath, - NumberFormat nf, - int fileSeq) throws IOException { - if (fileStatus.isDirectory()) { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (!fs.exists(finalSubPath)) { - fs.mkdirs(finalSubPath); - } - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); - for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq); - } - } else { - throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); - } - } else { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); - if (!fs.exists(finalSubPath.getParent())) { - fs.mkdirs(finalSubPath.getParent()); - } - if (fs.exists(finalSubPath)) { - throw new IOException("Already exists data file:" + finalSubPath); - } - boolean success = fs.rename(fileStatus.getPath(), finalSubPath); - if (success) { - LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } else { - LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } - } - } - } - - private String extractSubPath(Path parentPath, Path childPath) { - String parentPathStr = parentPath.toUri().getPath(); - String childPathStr = childPath.toUri().getPath(); - - if (parentPathStr.length() > childPathStr.length()) { - return null; - } - - int index = childPathStr.indexOf(parentPathStr); - if (index != 0) { - return null; - } - - return childPathStr.substring(parentPathStr.length() + 1); - } - private static interface QueryHook { boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java index 00b95ac,00b95ac..d4e0752 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java @@@ -24,7 -24,7 +24,7 @@@ import org.apache.tajo.QueryId import org.apache.tajo.TajoProtos; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.history.History; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 6b61d04,75d8ab6..1eaef0f --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@@ -62,7 -58,7 +62,8 @@@ import org.apache.tajo.rpc.CallFuture import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.StorageProperty; + import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; @@@ -476,7 -441,14 +479,20 @@@ public class QueryMasterTask extends Co // Create Output Directory //////////////////////////////////////////// - stagingDir = new Path(TajoConf.getStagingDir(conf), queryId); ++ String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, ""); + if (context.isCreateTable() || context.isInsert()) { - stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); ++ if (outputPath == null || outputPath.isEmpty()) { ++ // hbase ++ stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); ++ } else { ++ stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); ++ } + } else { + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + } + + // initializ + fs = stagingDir.getFileSystem(conf); if (fs.exists(stagingDir)) { throw new IOException("The staging directory '" + stagingDir + "' already exists"); http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 745456a,39bb7ed..7f05fa4 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@@ -57,7 -57,8 +57,9 @@@ import org.apache.tajo.master.TaskRunne import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; + import org.apache.tajo.master.container.TajoContainer; + import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.*; import org.apache.tajo.storage.StorageManager; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java index 7a81b4b,7a81b4b..aaf5754 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java @@@ -22,7 -22,7 +22,7 @@@ import com.google.gson.annotations.Expo import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto; import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java index 556a971,556a971..126e3fe --- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java @@@ -20,7 -20,7 +20,7 @@@ package org.apache.tajo.util.history import com.google.gson.annotations.Expose; import org.apache.tajo.engine.json.CoreGsonHelper; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; public class QueryUnitHistory implements GsonObject { @Expose private String id; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java index b3ac4d2,b3ac4d2..17b9ec7 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java @@@ -22,7 -22,7 +22,7 @@@ import com.google.gson.annotations.Expo import com.google.gson.reflect.TypeToken; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; import java.util.ArrayList; import java.util.List; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java ---------------------------------------------------------------------- diff --cc tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index cb9aa74,0000000..db8eb84 mode 100644,000000..100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@@ -1,1474 -1,0 +1,1469 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.InclusiveStopFilter; +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.hbase.*; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.net.InetAddress; +import java.sql.ResultSet; +import java.text.DecimalFormat; +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; + +@Category(IntegrationTest.class) +public class TestHBaseTable extends QueryTestCaseBase { + private static final Log LOG = LogFactory.getLog(TestHBaseTable.class); + + @BeforeClass + public static void beforeClass() { + try { + testingCluster.getHBaseUtil().startHBaseCluster(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @AfterClass + public static void afterClass() { + try { + testingCluster.getHBaseUtil().stopHBaseCluster(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testVerifyCreateHBaseTableRequiredMeta() throws Exception { + try { + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " + + "USING hbase").close(); + + fail("hbase table must have 'table' meta"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0); + } + + try { + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " + + "USING hbase " + + "WITH ('table'='hbase_table')").close(); + + fail("hbase table must have 'columns' meta"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("'columns' property is required") >= 0); + } + + try { + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " + + "USING hbase " + + "WITH ('table'='hbase_table', 'columns'='col1:,col2:')").close(); + + fail("hbase table must have 'hbase.zookeeper.quorum' meta"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0); + } + } + + @Test + public void testCreateHBaseTable() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table1"); + + HTableDescriptor hTableDesc = testingCluster.getHBaseUtil().getTableDescriptor("hbase_table"); + assertNotNull(hTableDesc); + assertEquals("hbase_table", hTableDesc.getNameAsString()); + + HColumnDescriptor[] hColumns = hTableDesc.getColumnFamilies(); + // col1 is mapped to rowkey + assertEquals(2, hColumns.length); + assertEquals("col2", hColumns[0].getNameAsString()); + assertEquals("col3", hColumns[1].getNameAsString()); + + executeString("DROP TABLE hbase_mapped_table1 PURGE").close(); + + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + try { + assertFalse(hAdmin.tableExists("hbase_table")); + } finally { + hAdmin.close(); + } + } + + @Test + public void testCreateNotExistsExternalHBaseTable() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + try { + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + fail("External table should be a existed table."); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("External table should be a existed table.") >= 0); + } + } + + @Test + public void testCreateRowFieldWithNonText() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + try { + executeString("CREATE TABLE hbase_mapped_table2 (rk1 int4, rk2 text, col3 text, col4 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key#b,1:key,col3:,col2:b', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + fail("Key field type should be TEXT type"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("Key field type should be TEXT type") >= 0); + } + } + + @Test + public void testCreateExternalHBaseTable() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table_not_purge")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table_not_purge', 'columns'=':key,col1:a,col2:,col3:b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + executeString("DROP TABLE external_hbase_mapped_table").close(); + + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + try { + assertTrue(hAdmin.tableExists("external_hbase_table_not_purge")); + hAdmin.disableTable("external_hbase_table_not_purge"); + hAdmin.deleteTable("external_hbase_table_not_purge"); + } finally { + hAdmin.close(); + } + } + + @Test + public void testSimpleSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(testingCluster.getHBaseUtil().getConf()); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(i).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testBinaryMappedQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, col1 text, col2 text, col3 int4)\n " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key#b,col1:a,col2:,col3:b#b', \n" + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "', \n" + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(testingCluster.getHBaseUtil().getConf()); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(Bytes.toBytes((long) i)); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes(i)); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk > 20"); + assertResultSet(res); + res.close(); + + //Projection + res = executeString("select col3, col2, rk from external_hbase_mapped_table where rk > 95"); + + String expected = "col3,col2,rk\n" + + "-------------------------------\n" + + "96,{\"k1\":\"k1-96\", \"k2\":\"k2-96\"},96\n" + + "97,{\"k1\":\"k1-97\", \"k2\":\"k2-97\"},97\n" + + "98,{\"k1\":\"k1-98\", \"k2\":\"k2-98\"},98\n" + + "99,{\"k1\":\"k1-99\", \"k2\":\"k2-99\"},99\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testColumnKeyValueSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, col2_key text, col2_value text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(testingCluster.getHBaseUtil().getConf()); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("rk-" + i)); + for (int j = 0; j < 5; j++) { + put.add("col2".getBytes(), ("key-" + j).getBytes(), Bytes.toBytes("value-" + j)); + } + put.add("col3".getBytes(), "".getBytes(), ("col3-value-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 >= 'rk-0'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testRowFieldSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, rk2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'='0:key,1:key,col3:a', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("external_hbase_mapped_table"); + + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(testingCluster.getHBaseUtil().getConf()); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(("field1-" + i + "_field2-" + i).getBytes()); + put.add("col3".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 > 'field1-20'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testIndexPredication() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + hAdmin.tableExists("hbase_table"); + + HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + try { + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + assertIndexPredication(false); + + ResultSet res = executeString("select * from hbase_mapped_table where rk >= '020' and rk <= '055'"); + assertResultSet(res); + res.close(); + + res = executeString("select * from hbase_mapped_table where rk = '021'"); + String expected = "rk,col1,col2,col3\n" + + "-------------------------------\n" + + "021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + htable.close(); + hAdmin.close(); + } + } + + @Test + public void testCompositeRowIndexPredication() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + hAdmin.tableExists("hbase_table"); + + HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + try { + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + + Scan scan = new Scan(); + scan.setStartRow("021".getBytes()); + scan.setStopRow(("021_" + new String(new char[]{Character.MAX_VALUE})).getBytes()); + Filter filter = new InclusiveStopFilter(scan.getStopRow()); + scan.setFilter(filter); + + ResultScanner scanner = htable.getScanner(scan); + Result result = scanner.next(); + assertNotNull(result); + assertEquals("021_021", new String(result.getRow())); + scanner.close(); + + assertIndexPredication(true); + + ResultSet res = executeString("select * from hbase_mapped_table where rk = '021'"); + String expected = "rk,rk2,col1,col2,col3\n" + + "-------------------------------\n" + + "021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + htable.close(); + hAdmin.close(); + } + } + + private void assertIndexPredication(boolean isCompositeRowKey) throws Exception { + String postFix = isCompositeRowKey ? "_" + new String(new char[]{Character.MAX_VALUE}) : ""; + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + ScanNode scanNode = new ScanNode(1); + + // where rk = '021' + EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("021"))); + scanNode.setQual(evalNodeEq); + StorageManager storageManager = StorageManager.getStorageManager(conf, StoreType.HBASE); + List<Fragment> fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(1, fragments.size()); + assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow())); + assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow())); + + // where rk >= '020' and rk <= '055' + EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("020"))); + EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("055"))); + EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); + scanNode.setQual(evalNodeA); + + fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(2, fragments.size()); + HBaseFragment fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + HBaseFragment fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + // where (rk >= '020' and rk <= '055') or rk = '075' + EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("075"))); + EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3); + scanNode.setQual(evalNodeB); + fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(3, fragments.size()); + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + HBaseFragment fragment3 = (HBaseFragment) fragments.get(2); + assertEquals("075", new String(fragment3.getStartRow())); + assertEquals("075" + postFix, new String(fragment3.getStopRow())); + + + // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078') + EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("072"))); + EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("078"))); + EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); + scanNode.setQual(evalNodeD); + fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(3, fragments.size()); + + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + fragment3 = (HBaseFragment) fragments.get(2); + assertEquals("072", new String(fragment3.getStartRow())); + assertEquals("078" + postFix, new String(fragment3.getStopRow())); + + // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059') + evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("057"))); + evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("059"))); + evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); + scanNode.setQual(evalNodeD); + fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(2, fragments.size()); + + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("059" + postFix, new String(fragment2.getStopRow())); + } + + @Test + public void testNonForwardQuery() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + HTable htable = null; + try { + hAdmin.tableExists("hbase_table"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i)); + htable.put(put); + } + + ResultSet res = executeString("select * from hbase_mapped_table"); + assertResultSet(res); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + hAdmin.close(); + if (htable == null) { + htable.close(); + } + } + } + + @Test + public void testJoin() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int8) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + HTable htable = null; + try { + hAdmin.tableExists("hbase_table"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes((long) i)); + htable.put(put); + } + + ResultSet res = executeString("select a.rk, a.col1, a.col2, a.col3, b.l_orderkey, b.l_linestatus " + + "from hbase_mapped_table a " + + "join default.lineitem b on a.col3 = b.l_orderkey"); + assertResultSet(res); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + hAdmin.close(); + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertInto() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + executeString("insert into hbase_mapped_table " + + "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")}, + new boolean[]{false, false, false, true}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegion() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegion2() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegionWithSplitFile() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + String splitFilePath = currentDatasetPath + "/splits.data"; + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys.file'='" + splitFilePath + "', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegionMultiRowFields() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a', " + + "'hbase.split.rowkeys'='001,002,003,004,005,006,007,008,009', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id1", Type.TEXT); + schema.addColumn("id2", Type.TEXT); + schema.addColumn("name", Type.TEXT); + DecimalFormat df = new DecimalFormat("000"); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|" + (i + 100) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id1, id2, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, null, Bytes.toBytes("col1")}, + new byte[][]{null, null, Bytes.toBytes("a")}, + new boolean[]{false, false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoBinaryMultiRegion() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key#b,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{true, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoColumnKeyValue() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col2_key text, col2_value text, col3 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("rk", Type.TEXT); + schema.addColumn("col2_key", Type.TEXT); + schema.addColumn("col2_value", Type.TEXT); + schema.addColumn("col3", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 20; i >= 0; i--) { + for (int j = 0; j < 3; j++) { + datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i); + } + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select rk, col2_key, col2_value, col3 from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, null, null}, + new boolean[]{false, false, false}, tableDesc.getSchema())); + + ResultSet res = executeString("select * from hbase_mapped_table"); + + String expected = "rk,col2_key,col2_value,col3\n" + + "-------------------------------\n" + + "0,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-0\n" + + "1,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-1\n" + + "10,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-10\n" + + "11,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-11\n" + + "12,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-12\n" + + "13,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-13\n" + + "14,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-14\n" + + "15,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-15\n" + + "16,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-16\n" + + "17,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-17\n" + + "18,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-18\n" + + "19,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-19\n" + + "2,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-2\n" + + "20,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-20\n" + + "3,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-3\n" + + "4,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-4\n" + + "5,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-5\n" + + "6,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-6\n" + + "7,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-7\n" + + "8,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-8\n" + + "9,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-9\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoDifferentType() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + try { + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + fail("If inserting data type different with target table data type, should throw exception"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("VerifyException") >= 0); + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + } + } + + @Test + public void testInsertIntoRowField() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " + + "'hbase.rowkey.delimiter'='_', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + executeString("insert into hbase_mapped_table " + + "select l_orderkey::text, l_partkey::text, l_shipdate, l_returnflag, l_suppkey::text from default.lineitem "); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), Bytes.toBytes(""), Bytes.toBytes("b")}, + new boolean[]{false, false, false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testCATS() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" + + " as " + + "select id, name from base_table" + ).close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoUsingPut() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + Map<String, String> sessions = new HashMap<String, String>(); + sessions.put(HBaseStorageConstants.INSERT_PUT_MODE, "true"); + client.updateSessionVariables(sessions); + + HTable htable = null; + ResultScanner scanner = null; + try { + executeString("insert into hbase_mapped_table " + + "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close(); + + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + // result is dirrerent with testInsertInto because l_orderkey is not unique. + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")}, + new boolean[]{false, false, false, true}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE)); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoLocation() throws Exception { + String hostName = InetAddress.getLocalHost().getHostName(); + String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(zkPort); + + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text) " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + + "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close(); + + assertTableExists("hbase_mapped_table"); + + try { + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + schema.addColumn("comment", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i + "|comment-" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into location '/tmp/hfile_test' " + - "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " + - "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," + - "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" + + "select id, name, comment from base_table ").close(); + + FileSystem fs = testingCluster.getDefaultFileSystem(); + Path path = new Path("/tmp/hfile_test"); + assertTrue(fs.exists(path)); + + FileStatus[] files = fs.listStatus(path); + assertNotNull(files); + assertEquals(2, files.length); + - assertEquals("/tmp/hfile_test/col2", files[1].getPath().toUri().getPath()); - - int index = 1; ++ int index = 0; + for (FileStatus eachFile: files) { - assertEquals("/tmp/hfile_test/col" + index, eachFile.getPath().toUri().getPath()); ++ assertEquals("/tmp/hfile_test/part-01-00000" + index + "-00" + index, eachFile.getPath().toUri().getPath()); + for (FileStatus subFile: fs.listStatus(eachFile.getPath())) { + assertTrue(subFile.isFile()); + assertTrue(subFile.getLen() > 0); + } + index++; + } + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + } + } + + private String resultSetToString(ResultScanner scanner, + byte[][] cfNames, byte[][] qualifiers, + boolean[] binaries, + Schema schema) throws Exception { + StringBuilder sb = new StringBuilder(); + Result result = null; + while ( (result = scanner.next()) != null ) { + if (binaries[0]) { + sb.append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(0), result.getRow()).asChar()); + } else { + sb.append(new String(result.getRow())); + } + + for (int i = 0; i < cfNames.length; i++) { + if (cfNames[i] == null) { + //rowkey + continue; + } + if (qualifiers[i] == null) { + Map<byte[], byte[]> values = result.getFamilyMap(cfNames[i]); + if (values == null) { + sb.append(", null"); + } else { + sb.append(", {"); + String delim = ""; + for (Map.Entry<byte[], byte[]> valueEntry: values.entrySet()) { + byte[] keyBytes = valueEntry.getKey(); + byte[] valueBytes = valueEntry.getValue(); + + if (binaries[i]) { + sb.append(delim).append("\"").append(keyBytes == null ? "" : Bytes.toLong(keyBytes)).append("\""); + sb.append(": \"").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\""); + } else { + sb.append(delim).append("\"").append(keyBytes == null ? "" : new String(keyBytes)).append("\""); + sb.append(": \"").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\""); + } + delim = ", "; + } + sb.append("}"); + } + } else { + byte[] value = result.getValue(cfNames[i], qualifiers[i]); + if (value == null) { + sb.append(", null"); + } else { + if (binaries[i]) { + sb.append(", ").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), value)); + } else { + sb.append(", ").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), value)); + } + } + } + } + sb.append("\n"); + } + + return sb.toString(); + } +}
