http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java new file mode 100644 index 0000000..4ff5e3e --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -0,0 +1,1408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.InclusiveStopFilter; +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.hbase.*; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; +import java.sql.ResultSet; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class TestHBaseTable extends QueryTestCaseBase { + private static final Log LOG = LogFactory.getLog(TestHBaseTable.class); + + private static String tableSpaceUri; + private static String hostName,zkPort; + + @BeforeClass + public static void beforeClass() throws IOException { + try { + testingCluster.getHBaseUtil().startHBaseCluster(); + hostName = InetAddress.getLocalHost().getHostName(); + zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT); + assertNotNull(hostName); + assertNotNull(zkPort); + } catch (Exception e) { + e.printStackTrace(); + } + + tableSpaceUri = "hbase:zk://" + hostName + ":" + zkPort; + HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri)); + hBaseTablespace.init(new TajoConf(testingCluster.getHBaseUtil().getConf())); + TablespaceManager.addTableSpaceForTest(hBaseTablespace); + } + + @AfterClass + public static void afterClass() { + try { + testingCluster.getHBaseUtil().stopHBaseCluster(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testVerifyCreateHBaseTableRequiredMeta() throws Exception { + try { + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) TABLESPACE cluster1 USING hbase").close(); + fail("hbase table must have 'table' meta"); + } catch (TajoException e) { + assertEquals(e.getErrorCode(), ResultCode.MISSING_TABLE_PROPERTY); + } + + try { + executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) TABLESPACE cluster1 " + + "USING hbase " + + "WITH ('table'='hbase_table')").close(); + + fail("hbase table must have 'columns' meta"); + } catch (TajoException e) { + assertEquals(e.getErrorCode(), ResultCode.MISSING_TABLE_PROPERTY); + } + } + + @Test + public void testCreateHBaseTable() throws Exception { + executeString( + "CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:a,col3:,col2:b')").close(); + + assertTableExists("hbase_mapped_table1"); + + HTableDescriptor hTableDesc = testingCluster.getHBaseUtil().getTableDescriptor("hbase_table"); + assertNotNull(hTableDesc); + assertEquals("hbase_table", hTableDesc.getNameAsString()); + + HColumnDescriptor[] hColumns = hTableDesc.getColumnFamilies(); + // col1 is mapped to rowkey + assertEquals(2, hColumns.length); + assertEquals("col2", hColumns[0].getNameAsString()); + assertEquals("col3", hColumns[1].getNameAsString()); + + executeString("DROP TABLE hbase_mapped_table1 PURGE").close(); + + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + try { + assertFalse(hAdmin.tableExists("hbase_table")); + } finally { + hAdmin.close(); + } + } + + @Test + public void testCreateNotExistsExternalHBaseTable() throws Exception { + String sql = String.format( + "CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:a,col3:,col2:b') " + + "LOCATION '%s/external_hbase_table'", tableSpaceUri); + try { + executeString(sql).close(); + fail("External table should be a existed table."); + } catch (Throwable e) { + assertTrue(e.getMessage().indexOf("External table should be a existed table.") >= 0); + } + } + + @Test + public void testCreateRowFieldWithNonText() throws Exception { + try { + executeString("CREATE TABLE hbase_mapped_table2 (rk1 int4, rk2 text, col3 text, col4 text) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'='0:key#b,1:key,col3:,col2:b', " + + "'hbase.rowkey.delimiter'='_')").close(); + fail("Key field type should be TEXT type"); + } catch (Exception e) { + assertTrue(e.getMessage().indexOf("Key field type should be TEXT type") >= 0); + } + } + + @Test + public void testCreateExternalHBaseTable() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table_not_purge")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String sql = String.format( + "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table_not_purge', 'columns'=':key,col1:a,col2:,col3:b') " + + "LOCATION '%s/external_hbase_table'", tableSpaceUri); + executeString(sql).close(); + + assertTableExists("external_hbase_mapped_table"); + + executeString("DROP TABLE external_hbase_mapped_table").close(); + + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + try { + assertTrue(hAdmin.tableExists("external_hbase_table_not_purge")); + hAdmin.disableTable("external_hbase_table_not_purge"); + hAdmin.deleteTable("external_hbase_table_not_purge"); + } finally { + hAdmin.close(); + } + } + + @Test + public void testSimpleSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String sql = String.format( + "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " + + "LOCATION '%s/external_hbase_table'", tableSpaceUri); + executeString(sql).close(); + + assertTableExists("external_hbase_mapped_table"); + + HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); + HConnection hconn = space.getConnection(); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(i).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testBinaryMappedQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String sql = String.format( + "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, col1 text, col2 text, col3 int4)\n " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key#b,col1:a,col2:,col3:b#b') " + + "LOCATION '%s/external_hbase_table'", tableSpaceUri); + executeString(sql).close(); + + assertTableExists("external_hbase_mapped_table"); + + HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); + HConnection hconn = space.getConnection(); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(Bytes.toBytes((long) i)); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes(i)); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk > 20"); + assertResultSet(res); + res.close(); + + //Projection + res = executeString("select col3, col2, rk from external_hbase_mapped_table where rk > 95"); + + String expected = "col3,col2,rk\n" + + "-------------------------------\n" + + "96,{\"k1\":\"k1-96\", \"k2\":\"k2-96\"},96\n" + + "97,{\"k1\":\"k1-97\", \"k2\":\"k2-97\"},97\n" + + "98,{\"k1\":\"k1-98\", \"k2\":\"k2-98\"},98\n" + + "99,{\"k1\":\"k1-99\", \"k2\":\"k2-99\"},99\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testColumnKeyValueSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String sql = String.format( + "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, col2_key text, col2_value text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " + + "'hbase.rowkey.delimiter'='_') LOCATION '%s/external_hbase_table'", tableSpaceUri); + executeString(sql).close(); + + assertTableExists("external_hbase_mapped_table"); + + HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); + HConnection hconn = space.getConnection(); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("rk-" + i)); + for (int j = 0; j < 5; j++) { + put.add("col2".getBytes(), ("key-" + j).getBytes(), Bytes.toBytes("value-" + j)); + } + put.add("col3".getBytes(), "".getBytes(), ("col3-value-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 >= 'rk-0'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testRowFieldSelectQuery() throws Exception { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String sql = String.format( + "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, rk2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'='0:key,1:key,col3:a', " + + "'hbase.rowkey.delimiter'='_') LOCATION '%s/external_hbase_table'", tableSpaceUri); + executeString(sql).close(); + + assertTableExists("external_hbase_mapped_table"); + + HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); + HConnection hconn = space.getConnection(); + HTableInterface htable = hconn.getTable("external_hbase_table"); + + try { + for (int i = 0; i < 100; i++) { + Put put = new Put(("field1-" + i + "_field2-" + i).getBytes()); + put.add("col3".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + htable.put(put); + } + + ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 > 'field1-20'"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + htable.close(); + } + } + + @Test + public void testIndexPredication() throws Exception { + String sql = + "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " + + "'hbase.split.rowkeys'='010,040,060,080') "; + executeString(sql).close(); + + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + hAdmin.tableExists("hbase_table"); + + HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + try { + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + assertIndexPredication(false); + + ResultSet res = executeString("select * from hbase_mapped_table where rk >= '020' and rk <= '055'"); + assertResultSet(res); + res.close(); + + res = executeString("select * from hbase_mapped_table where rk = '021'"); + String expected = "rk,col1,col2,col3\n" + + "-------------------------------\n" + + "021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + htable.close(); + hAdmin.close(); + } + } + + @Test + public void testCompositeRowIndexPredication() throws Exception { + + executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " + + "'hbase.split.rowkeys'='010,040,060,080', " + + "'hbase.rowkey.delimiter'='_')").close(); + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + hAdmin.tableExists("hbase_table"); + + HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + try { + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + + Scan scan = new Scan(); + scan.setStartRow("021".getBytes()); + scan.setStopRow(("021_" + new String(new char[]{Character.MAX_VALUE})).getBytes()); + Filter filter = new InclusiveStopFilter(scan.getStopRow()); + scan.setFilter(filter); + + ResultScanner scanner = htable.getScanner(scan); + Result result = scanner.next(); + assertNotNull(result); + assertEquals("021_021", new String(result.getRow())); + scanner.close(); + + assertIndexPredication(true); + + ResultSet res = executeString("select * from hbase_mapped_table where rk = '021'"); + String expected = "rk,rk2,col1,col2,col3\n" + + "-------------------------------\n" + + "021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + htable.close(); + hAdmin.close(); + } + } + + private void assertIndexPredication(boolean isCompositeRowKey) throws Exception { + String postFix = isCompositeRowKey ? "_" + new String(new char[]{Character.MAX_VALUE}) : ""; + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + ScanNode scanNode = new ScanNode(1); + + // where rk = '021' + EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("021"))); + scanNode.setQual(evalNodeEq); + Tablespace tablespace = TablespaceManager.getByName("cluster1").get(); + List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(1, fragments.size()); + assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow())); + assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow())); + + // where rk >= '020' and rk <= '055' + EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("020"))); + EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("055"))); + EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); + scanNode.setQual(evalNodeA); + + fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(2, fragments.size()); + HBaseFragment fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + HBaseFragment fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + // where (rk >= '020' and rk <= '055') or rk = '075' + EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("075"))); + EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3); + scanNode.setQual(evalNodeB); + fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(3, fragments.size()); + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + HBaseFragment fragment3 = (HBaseFragment) fragments.get(2); + assertEquals("075", new String(fragment3.getStartRow())); + assertEquals("075" + postFix, new String(fragment3.getStopRow())); + + + // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078') + EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("072"))); + EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("078"))); + EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); + scanNode.setQual(evalNodeD); + fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(3, fragments.size()); + + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("055" + postFix, new String(fragment2.getStopRow())); + + fragment3 = (HBaseFragment) fragments.get(2); + assertEquals("072", new String(fragment3.getStartRow())); + assertEquals("078" + postFix, new String(fragment3.getStopRow())); + + // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059') + evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("057"))); + evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")), + new ConstEval(new TextDatum("059"))); + evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); + scanNode.setQual(evalNodeD); + fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode); + assertEquals(2, fragments.size()); + + fragment1 = (HBaseFragment) fragments.get(0); + assertEquals("020", new String(fragment1.getStartRow())); + assertEquals("040", new String(fragment1.getStopRow())); + + fragment2 = (HBaseFragment) fragments.get(1); + assertEquals("040", new String(fragment2.getStartRow())); + assertEquals("059" + postFix, new String(fragment2.getStopRow())); + } + + @Test + public void testNonForwardQuery() throws Exception { + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " + + "'hbase.split.rowkeys'='010,040,060,080')").close(); + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + HTable htable = null; + try { + hAdmin.tableExists("hbase_table"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i)); + htable.put(put); + } + + ResultSet res = executeString("select * from hbase_mapped_table"); + assertResultSet(res); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + hAdmin.close(); + if (htable == null) { + htable.close(); + } + } + } + + @Test + public void testJoin() throws Exception { + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int8) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " + + "'hbase.split.rowkeys'='010,040,060,080')").close(); + + assertTableExists("hbase_mapped_table"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + HTable htable = null; + try { + hAdmin.tableExists("hbase_table"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(5, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes((long) i)); + htable.put(put); + } + + ResultSet res = executeString("select a.rk, a.col1, a.col2, a.col3, b.l_orderkey, b.l_linestatus " + + "from hbase_mapped_table a " + + "join default.lineitem b on a.col3 = b.l_orderkey"); + assertResultSet(res); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + hAdmin.close(); + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertInto() throws Exception { + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + executeString("insert into hbase_mapped_table " + + "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")}, + new boolean[]{false, false, false, true}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertValues1() throws Exception { + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + executeString("insert into hbase_mapped_table select 'aaa', 'a12', 'a34', 1").close(); + executeString("insert into hbase_mapped_table select 'bbb', 'b12', 'b34', 2").close(); + executeString("insert into hbase_mapped_table select 'ccc', 'c12', 'c34', 3").close(); + executeString("insert into hbase_mapped_table select 'ddd', 'd12', 'd34', 4").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")}, + new boolean[]{false, false, false, true}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegion() throws Exception { + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='010,040,060,080')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegion2() throws Exception { + executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegionWithSplitFile() throws Exception { + String splitFilePath = currentDatasetPath + "/splits.data"; + + executeString( + "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys.file'='" + splitFilePath + "')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoMultiRegionMultiRowFields() throws Exception { + executeString( + "CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a', " + + "'hbase.split.rowkeys'='001,002,003,004,005,006,007,008,009', " + + "'hbase.rowkey.delimiter'='_')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id1", Type.TEXT); + schema.addColumn("id2", Type.TEXT); + schema.addColumn("name", Type.TEXT); + DecimalFormat df = new DecimalFormat("000"); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|" + (i + 100) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id1, id2, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, null, Bytes.toBytes("col1")}, + new byte[][]{null, null, Bytes.toBytes("a")}, + new boolean[]{false, false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoBinaryMultiRegion() throws Exception { + executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key#b,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{true, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoColumnKeyValue() throws Exception { + executeString( + "CREATE TABLE hbase_mapped_table (rk text, col2_key text, col2_value text, col3 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " + + "'hbase.rowkey.delimiter'='_')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("rk", Type.TEXT); + schema.addColumn("col2_key", Type.TEXT); + schema.addColumn("col2_value", Type.TEXT); + schema.addColumn("col3", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 20; i >= 0; i--) { + for (int j = 0; j < 3; j++) { + datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i); + } + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into hbase_mapped_table " + + "select rk, col2_key, col2_value, col3 from base_table ").close(); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, null, null}, + new boolean[]{false, false, false}, tableDesc.getSchema())); + + ResultSet res = executeString("select * from hbase_mapped_table"); + + String expected = "rk,col2_key,col2_value,col3\n" + + "-------------------------------\n" + + "0,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-0\n" + + "1,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-1\n" + + "10,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-10\n" + + "11,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-11\n" + + "12,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-12\n" + + "13,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-13\n" + + "14,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-14\n" + + "15,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-15\n" + + "16,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-16\n" + + "17,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-17\n" + + "18,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-18\n" + + "19,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-19\n" + + "2,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-2\n" + + "20,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-20\n" + + "3,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-3\n" + + "4,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-4\n" + + "5,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-5\n" + + "6,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-6\n" + + "7,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-7\n" + + "8,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-8\n" + + "9,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-9\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoDifferentType() throws Exception { + executeString( + "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close(); + + assertTableExists("hbase_mapped_table"); + + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<String>(); + for (int i = 99; i >= 0; i--) { + datas.add(i + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + try { + executeString("insert into hbase_mapped_table " + + "select id, name from base_table ").close(); + fail("If inserting data type different with target table data type, should throw exception"); + } catch (TajoException e) { + assertEquals(ResultCode.DATATYPE_MISMATCH, e.getErrorCode()); + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + } + } + + @Test + public void testInsertIntoRowField() throws Exception { + executeString( + "CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text, col2 text, col3 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " + + "'hbase.rowkey.delimiter'='_')").close(); + + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + executeString("insert into hbase_mapped_table " + + "select l_orderkey::text, l_partkey::text, l_shipdate, l_returnflag, l_suppkey::text from default.lineitem "); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), Bytes.toBytes(""), Bytes.toBytes("b")}, + new boolean[]{false, false, false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testCTAS() throws Exception { + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + List<String> datas = new ArrayList<>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString( + "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " + + "'hbase.split.rowkeys'='010,040,060,080') as" + + " select id, name from base_table" + ).close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + HTable htable = null; + ResultScanner scanner = null; + try { + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scanner = htable.getScanner(scan); + + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1")}, + new byte[][]{null, Bytes.toBytes("a")}, + new boolean[]{false, false}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + + // TODO - rollback should support its corresponding hbase table + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + if (hAdmin.tableExists("hbase_table")) { + hAdmin.disableTable("hbase_table"); + hAdmin.deleteTable("hbase_table"); + } + } + } + + @Test + public void testInsertIntoUsingPut() throws Exception { + executeString( + "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close(); + + assertTableExists("hbase_mapped_table"); + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table"); + + Map<String, String> sessions = new HashMap<>(); + sessions.put(HBaseStorageConstants.INSERT_PUT_MODE, "true"); + client.updateSessionVariables(sessions); + + HTable htable = null; + ResultScanner scanner = null; + try { + executeString( + "insert into hbase_mapped_table " + + "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem" + ).close(); + + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table"); + + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes("col1")); + scan.addFamily(Bytes.toBytes("col2")); + scan.addFamily(Bytes.toBytes("col3")); + scanner = htable.getScanner(scan); + + // result is dirrerent with testInsertInto because l_orderkey is not unique. + assertStrings(resultSetToString(scanner, + new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")}, + new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")}, + new boolean[]{false, false, false, true}, tableDesc.getSchema())); + + } finally { + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + + client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE)); + + if (scanner != null) { + scanner.close(); + } + + if (htable != null) { + htable.close(); + } + } + } + + @Test + public void testInsertIntoLocation() throws Exception { + executeString( + "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text) TABLESPACE cluster1 " + + "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " + + "'hbase.split.rowkeys'='010,040,060,080')").close(); + + assertTableExists("hbase_mapped_table"); + + try { + // create test table + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.TEXT); + schema.addColumn("name", Type.TEXT); + schema.addColumn("comment", Type.TEXT); + List<String> datas = new ArrayList<>(); + DecimalFormat df = new DecimalFormat("000"); + for (int i = 99; i >= 0; i--) { + datas.add(df.format(i) + "|value" + i + "|comment-" + i); + } + TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", + schema, tableOptions, datas.toArray(new String[]{}), 2); + + executeString("insert into location '/tmp/hfile_test' " + + "select id, name, comment from base_table ").close(); + + FileSystem fs = testingCluster.getDefaultFileSystem(); + Path path = new Path("/tmp/hfile_test"); + assertTrue(fs.exists(path)); + + FileStatus[] files = fs.listStatus(path); + assertNotNull(files); + assertEquals(2, files.length); + + int index = 0; + for (FileStatus eachFile: files) { + assertEquals("/tmp/hfile_test/part-01-00000" + index + "-00" + index, eachFile.getPath().toUri().getPath()); + for (FileStatus subFile: fs.listStatus(eachFile.getPath())) { + assertTrue(subFile.isFile()); + assertTrue(subFile.getLen() > 0); + } + index++; + } + } finally { + executeString("DROP TABLE base_table PURGE").close(); + executeString("DROP TABLE hbase_mapped_table PURGE").close(); + } + } + + private String resultSetToString(ResultScanner scanner, byte[][] cfNames, byte[][] qualifiers, + boolean [] binaries, Schema schema) throws Exception { + StringBuilder sb = new StringBuilder(); + Result result = null; + while ( (result = scanner.next()) != null ) { + if (binaries[0]) { + sb.append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(0), result.getRow()).asChar()); + } else { + sb.append(new String(result.getRow())); + } + + for (int i = 0; i < cfNames.length; i++) { + if (cfNames[i] == null) { + //rowkey + continue; + } + if (qualifiers[i] == null) { + Map<byte[], byte[]> values = result.getFamilyMap(cfNames[i]); + if (values == null) { + sb.append(", null"); + } else { + sb.append(", {"); + String delim = ""; + for (Map.Entry<byte[], byte[]> valueEntry: values.entrySet()) { + byte[] keyBytes = valueEntry.getKey(); + byte[] valueBytes = valueEntry.getValue(); + + if (binaries[i]) { + sb.append(delim).append("\"").append(keyBytes == null ? "" : Bytes.toLong(keyBytes)).append("\""); + sb.append(": \"").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\""); + } else { + sb.append(delim).append("\"").append(keyBytes == null ? "" : new String(keyBytes)).append("\""); + sb.append(": \"").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\""); + } + delim = ", "; + } + sb.append("}"); + } + } else { + byte[] value = result.getValue(cfNames[i], qualifiers[i]); + if (value == null) { + sb.append(", null"); + } else { + if (binaries[i]) { + sb.append(", ").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), value)); + } else { + sb.append(", ").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), value)); + } + } + } + } + sb.append("\n"); + } + + return sb.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInSubquery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInSubquery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInSubquery.java new file mode 100644 index 0000000..fe465f1 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInSubquery.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.NamedTest; +import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.exception.TajoRuntimeException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.SQLException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@Category(IntegrationTest.class) +@RunWith(Parameterized.class) +@NamedTest("TestJoinQuery") +public class TestInSubquery extends TestJoinQuery { + + public TestInSubquery(String joinOption) throws Exception { + super(joinOption); + } + + @BeforeClass + public static void setup() throws Exception { + TestJoinQuery.setup(); + } + + @AfterClass + public static void classTearDown() throws SQLException { + TestJoinQuery.classTearDown(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testInSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testInSubQuery2() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testNestedInSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testInSubQueryWithOtherConditions() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testMultipleInSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testInSubQueryWithJoin() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testInSubQueryWithTableSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testNotInSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testMultipleNotInSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testNestedNotInSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testInAndNotInSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testNestedInAndNotInSubQuery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testNestedInSubQuery2() throws Exception { + // select c_name from customer + // where c_nationkey in ( + // select n_nationkey from nation where n_name like 'C%' and n_regionkey in ( + // select count(*)-1 from region where r_regionkey > 0 and r_regionkey < 3)) + runSimpleTests(); + } + + @Test() + public final void testCorrelatedSubQuery() throws Exception { + // Use try-catch clause to verify the exact error message + try { + executeString("select * from nation where n_regionkey in (select r_regionkey from region where n_name > r_name)"); + fail("Correlated subquery must raise the UnimplementedException."); + } catch (TajoRuntimeException e) { + assertEquals(ResultCode.NOT_IMPLEMENTED, e.getErrorCode()); + } + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public final void testSameKeyNameOfOuterAndInnerQueries() throws Exception { + runSimpleTests(); + } + + @Test + @Option(parameterized = true, sort = true) + @SimpleTest() + public final void testWithAsteriskAndJoin() throws Exception { + // select * from lineitem, orders where l_orderkey = o_orderkey and l_partkey in + // (select l_partkey from lineitem where l_linenumber in (1, 3, 5, 7, 9)) + runSimpleTests(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestIndexScan.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestIndexScan.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestIndexScan.java new file mode 100644 index 0000000..a255c6d --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestIndexScan.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import com.google.protobuf.ServiceException; +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.SessionVars; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.exception.NoSuchSessionVariableException; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +@Category(IntegrationTest.class) +public class TestIndexScan extends QueryTestCaseBase { + + public TestIndexScan() throws ServiceException, SQLException, NoSuchSessionVariableException { + super(TajoConstants.DEFAULT_DATABASE_NAME); + Map<String,String> sessionVars = new HashMap<String, String>(); + sessionVars.put(SessionVars.INDEX_ENABLED.keyname(), "true"); + sessionVars.put(SessionVars.INDEX_SELECTIVITY_THRESHOLD.keyname(), "0.01f"); + client.updateSessionVariables(sessionVars); + } + + @Test + public final void testOnSortedNonUniqueKeys() throws Exception { + executeString("create index l_orderkey_idx on lineitem (l_orderkey)"); + try { + ResultSet res = executeString("select * from lineitem where l_orderkey = 1;"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop index l_orderkey_idx"); + } + } + + @Test + public final void testOnUnsortedTextKeys() throws Exception { + executeString("create index l_shipdate_idx on lineitem (l_shipdate)"); + try { + ResultSet res = executeString("select l_orderkey, l_shipdate, l_comment from lineitem where l_shipdate = '1997-01-28';"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop index l_shipdate_idx"); + } + } + + @Test + public final void testOnMultipleKeys() throws Exception { + executeString("create index multikey_idx on lineitem (l_shipdate asc null last, l_tax desc null first, l_shipmode, l_linenumber desc null last)"); + try { + ResultSet res = executeString("select l_orderkey, l_shipdate, l_comment from lineitem " + + "where l_shipdate = '1997-01-28' and l_tax = 0.05 and l_shipmode = 'RAIL' and l_linenumber = 1;"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop index multikey_idx"); + } + } + + @Test + public final void testOnMultipleKeys2() throws Exception { + executeString("create index multikey_idx on lineitem (l_shipdate asc null last, l_tax desc null first)"); + try { + ResultSet res = executeString("select l_orderkey, l_shipdate, l_comment from lineitem " + + "where l_shipdate = '1997-01-28' and l_tax = 0.05 and l_shipmode = 'RAIL' and l_linenumber = 1;"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop index multikey_idx"); + } + } + + @Test + public final void testOnMultipleExprs() throws Exception { + executeString("create index l_orderkey_100_l_linenumber_10_idx on lineitem (l_orderkey*100-l_linenumber*10 asc null first);"); + try { + ResultSet res = executeString("select l_orderkey, l_linenumber from lineitem where l_orderkey*100-l_linenumber*10 = 280"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop index l_orderkey_100_l_linenumber_10_idx"); + } + } + + @Test + public final void testWithGroupBy() throws Exception { + executeString("create index l_shipdate_idx on lineitem (l_shipdate)"); + try { + ResultSet res = executeString("select l_shipdate, count(*) from lineitem where l_shipdate = '1997-01-28' group by l_shipdate;"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop index l_shipdate_idx"); + } + } + + @Test + public final void testWithSort() throws Exception { + executeString("create index l_orderkey_idx on lineitem (l_orderkey)"); + try { + ResultSet res = executeString("select l_shipdate from lineitem where l_orderkey = 1 order by l_shipdate;"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop index l_orderkey_idx"); + } + } + + @Test + public final void testWithJoin() throws Exception { + executeString("create index l_orderkey_idx on lineitem (l_orderkey)"); + executeString("create index o_orderkey_idx on orders (o_orderkey)"); + try { + ResultSet res = executeString("select l_shipdate, o_orderstatus from lineitem, orders where l_orderkey = o_orderkey and l_orderkey = 1 and o_orderkey = 1;"); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop index l_orderkey_idx"); + executeString("drop index o_orderkey_idx"); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinQuery.java new file mode 100644 index 0000000..88b3548 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinQuery.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.NamedTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.ResultSet; +import java.sql.SQLException; + +@Category(IntegrationTest.class) +@RunWith(Parameterized.class) +@NamedTest("TestJoinQuery") +public class TestInnerJoinQuery extends TestJoinQuery { + + public TestInnerJoinQuery(String joinOption) throws Exception { + super(joinOption); + } + + @BeforeClass + public static void setup() throws Exception { + TestJoinQuery.setup(); + } + + @AfterClass + public static void classTearDown() throws SQLException { + TestJoinQuery.classTearDown(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true, sort = true) + @SimpleTest(queries = { + @QuerySpec("select n_name, r_name, n_regionkey, r_regionkey from nation, region order by n_name, r_name"), + // testCrossJoinWithAsterisk + @QuerySpec("select region.*, customer.* from region, customer"), + @QuerySpec("select region.*, customer.* from customer, region"), + @QuerySpec("select * from customer, region"), + @QuerySpec("select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name") + }) + public final void testCrossJoin() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true, sort = true) + @SimpleTest() + public final void testCrossJoinWithThetaJoinConditionInWhere() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testInnerJoinWithThetaJoinConditionInWhere() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testWhereClauseJoin1() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testWhereClauseJoin2() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testWhereClauseJoin3() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testWhereClauseJoin4() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testWhereClauseJoin5() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testWhereClauseJoin6() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testTPCHQ2Join() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testJoinWithMultipleJoinQual1() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testJoinCoReferredEvals1() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testJoinCoReferredEvalsWithSameExprs1() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testJoinCoReferredEvalsWithSameExprs2() throws Exception { + // including grouping operator + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testInnerJoinAndCaseWhen() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testInnerJoinWithEmptyTable() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testCrossJoinWithEmptyTable1() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest(prepare = { + "CREATE DATABASE JOINS", + "CREATE TABLE JOINS.part_ as SELECT * FROM part", + "CREATE TABLE JOINS.supplier_ as SELECT * FROM supplier" + }, cleanup = { + "DROP TABLE JOINS.part_ PURGE", + "DROP TABLE JOINS.supplier_ PURGE", + "DROP DATABASE JOINS" + }) + public final void testJoinOnMultipleDatabases() throws Exception { + runSimpleTests(); + } + + @Test + public final void testJoinWithJson() throws Exception { + // select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name + ResultSet res = executeJsonQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public final void testJoinOnMultipleDatabasesWithJson() throws Exception { + executeString("CREATE DATABASE JOINS"); + assertDatabaseExists("joins"); + executeString("CREATE TABLE JOINS.part_ as SELECT * FROM part"); + assertTableExists("joins.part_"); + executeString("CREATE TABLE JOINS.supplier_ as SELECT * FROM supplier"); + assertTableExists("joins.supplier_"); + + try { + ResultSet res = executeJsonQuery(); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE JOINS.part_ PURGE"); + executeString("DROP TABLE JOINS.supplier_ PURGE"); + executeString("DROP DATABASE JOINS"); + } + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testJoinAsterisk() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testDifferentTypesJoinCondition() throws Exception { + // select * from table20 t3 join table21 t4 on t3.id = t4.id; + executeDDL("table1_int8_ddl.sql", "table1", "table20"); + executeDDL("table1_int4_ddl.sql", "table1", "table21"); + try { + runSimpleTests(); + } finally { + executeString("DROP TABLE table20"); + executeString("DROP TABLE table21"); + } + } + + @Test + @SimpleTest + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + public void testComplexJoinCondition1() throws Exception { + // select n1.n_nationkey, n1.n_name, n2.n_name from nation n1 join nation n2 on n1.n_name = upper(n2.n_name); + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testComplexJoinCondition2() throws Exception { + // select n1.n_nationkey, n1.n_name, upper(n2.n_name) name from nation n1 join nation n2 + // on n1.n_name = upper(n2.n_name); + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testComplexJoinCondition3() throws Exception { + // select n1.n_nationkey, n1.n_name, n2.n_name from nation n1 join nation n2 on lower(n1.n_name) = lower(n2.n_name); + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testComplexJoinCondition4() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testJoinWithOrPredicates() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testNaturalJoin() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testCrossJoinAndCaseWhen() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testCrossJoinWithAsterisk1() throws Exception { + // select region.*, customer.* from region, customer; + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testCrossJoinWithAsterisk2() throws Exception { + // select region.*, customer.* from customer, region; + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testCrossJoinWithAsterisk3() throws Exception { + // select * from customer, region + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testCrossJoinWithAsterisk4() throws Exception { + // select length(r_regionkey), *, c_custkey*10 from customer, region + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testBroadcastTwoPartJoin() throws Exception { + runSimpleTests(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java new file mode 100644 index 0000000..6c9546e --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.NamedTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.ResultSet; +import java.sql.SQLException; + +@Category(IntegrationTest.class) +@RunWith(Parameterized.class) +@NamedTest("TestJoinQuery") +public class TestInnerJoinWithSubQuery extends TestJoinQuery { + + public TestInnerJoinWithSubQuery(String joinOption) throws Exception { + super(joinOption); + } + + @BeforeClass + public static void setup() throws Exception { + TestJoinQuery.setup(); + } + + @AfterClass + public static void classTearDown() throws SQLException { + TestJoinQuery.classTearDown(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testJoinWithMultipleJoinQual2() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testJoinWithMultipleJoinQual3() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testJoinWithMultipleJoinQual4() throws Exception { + runSimpleTests(); + } + + @Test + public final void testJoinWithJson2() throws Exception { + /* + select t.n_nationkey, t.n_name, t.n_regionkey, t.n_comment, ps.ps_availqty, s.s_suppkey + from ( + select n_nationkey, n_name, n_regionkey, n_comment + from nation n + join region r on (n.n_regionkey = r.r_regionkey) + ) t + join supplier s on (s.s_nationkey = t.n_nationkey) + join partsupp ps on (s.s_suppkey = ps.ps_suppkey) + where t.n_name in ('ARGENTINA','ETHIOPIA', 'MOROCCO'); + */ + ResultSet res = executeJsonQuery(); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testComplexJoinCondition5() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testComplexJoinCondition6() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testComplexJoinCondition7() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testBroadcastSubquery() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testBroadcastSubquery2() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public final void testThetaJoinKeyPairs() throws Exception { + runSimpleTests(); + } +}
