http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java new file mode 100644 index 0000000..d0088a5 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java @@ -0,0 +1,882 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.sql.ResultSet; +import java.util.List; + +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class TestInsertQuery extends QueryTestCaseBase { + + @Test + public final void testInsertOverwrite() throws Exception { + ResultSet res = executeFile("table1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "table1")); + + res = executeFile("testInsertOverwrite.sql"); + res.close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + + executeString("DROP TABLE table1 PURGE"); + } + + @Test + public final void testInsertInto() throws Exception { + // create table and upload test data + ResultSet res = executeFile("table1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "table1")); + + res = executeFile("testInsertOverwrite.sql"); + res.close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + + res = executeFile("testInsertInto.sql"); + res.close(); + + List<Path> dataFiles = listTableFiles("table1"); + assertEquals(2, dataFiles.size()); + + for (int i = 0; i < dataFiles.size(); i++) { + String name = dataFiles.get(i).getName(); + assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*")); + String[] tokens = name.split("-"); + assertEquals(4, tokens.length); + assertEquals(i, Integer.parseInt(tokens[3])); + } + + String tableDatas = getTableFileContents("table1"); + + String expected = "1|1|17.0\n" + + "1|1|36.0\n" + + "2|2|38.0\n" + + "3|2|45.0\n" + + "3|3|49.0\n" + + "1|1|17.0\n" + + "1|1|36.0\n" + + "2|2|38.0\n" + + "3|2|45.0\n" + + "3|3|49.0\n"; + + assertNotNull(tableDatas); + assertEquals(expected, tableDatas); + + executeString("DROP TABLE table1 PURGE"); + } + + @Test + public final void testInsertIntoLocation() throws Exception { + Path dfsPath = new Path("/tajo-data/testInsertIntoLocation"); + assertTestInsertIntoLocation(dfsPath); + } + + @Test + public final void testInsertIntoLocationDifferentFSs() throws Exception { + Path localPath = CommonTestingUtil.getTestDir(); + assertTestInsertIntoLocation(localPath); + } + + public final void assertTestInsertIntoLocation(Path path) throws Exception { + FileSystem fs = null; + + try { + executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close(); + + String resultFileData = getTableFileContents(path); + String expected = "1|1|1\n" + + "1|1|2\n" + + "2|2|1\n" + + "3|2|1\n" + + "3|3|2\n"; + + assertEquals(expected, resultFileData); + + fs = path.getFileSystem(testingCluster.getConfiguration()); + + FileStatus[] files = fs.listStatus(path); + assertNotNull(files); + assertEquals(1, files.length); + + for (FileStatus eachFileStatus : files) { + String name = eachFileStatus.getPath().getName(); + assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*")); + } + + executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close(); + resultFileData = getTableFileContents(path); + expected = "1|1|1\n" + + "1|1|2\n" + + "2|2|1\n" + + "3|2|1\n" + + "3|3|2\n"; + + assertEquals(expected + expected, resultFileData); + + files = fs.listStatus(path); + assertNotNull(files); + assertEquals(2, files.length); + + for (FileStatus eachFileStatus : files) { + String name = eachFileStatus.getPath().getName(); + assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*")); + } + } finally { + if (fs != null) { + fs.delete(path, true); + } + } + } + + @Test + public final void testInsertIntoPartitionedTable() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoPartitionedTable"); + executeString("create table " + tableName + " (n_name TEXT, n_regionkey INT4)" + + "USING csv PARTITION by column(n_nationkey INT4)" ).close(); + + try { + executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close(); + + ResultSet res = executeString("select * from " + tableName); + + String expected = "n_name,n_regionkey,n_nationkey\n" + + "-------------------------------\n" + + "ALGERIA,0,0\n" + + "ARGENTINA,1,1\n" + + "IRAN,4,10\n" + + "IRAQ,4,11\n" + + "JAPAN,2,12\n" + + "JORDAN,4,13\n" + + "KENYA,0,14\n" + + "MOROCCO,0,15\n" + + "MOZAMBIQUE,0,16\n" + + "PERU,1,17\n" + + "CHINA,2,18\n" + + "ROMANIA,3,19\n" + + "BRAZIL,1,2\n" + + "SAUDI ARABIA,4,20\n" + + "VIETNAM,2,21\n" + + "RUSSIA,3,22\n" + + "UNITED KINGDOM,3,23\n" + + "UNITED STATES,1,24\n" + + "CANADA,1,3\n" + + "EGYPT,4,4\n" + + "ETHIOPIA,0,5\n" + + "FRANCE,3,6\n" + + "GERMANY,3,7\n" + + "INDIA,2,8\n" + + "INDONESIA,2,9\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + + executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close(); + res = executeString("select * from " + tableName); + expected = "n_name,n_regionkey,n_nationkey\n" + + "-------------------------------\n" + + "ALGERIA,0,0\n" + + "ALGERIA,0,0\n" + + "ARGENTINA,1,1\n" + + "ARGENTINA,1,1\n" + + "IRAN,4,10\n" + + "IRAN,4,10\n" + + "IRAQ,4,11\n" + + "IRAQ,4,11\n" + + "JAPAN,2,12\n" + + "JAPAN,2,12\n" + + "JORDAN,4,13\n" + + "JORDAN,4,13\n" + + "KENYA,0,14\n" + + "KENYA,0,14\n" + + "MOROCCO,0,15\n" + + "MOROCCO,0,15\n" + + "MOZAMBIQUE,0,16\n" + + "MOZAMBIQUE,0,16\n" + + "PERU,1,17\n" + + "PERU,1,17\n" + + "CHINA,2,18\n" + + "CHINA,2,18\n" + + "ROMANIA,3,19\n" + + "ROMANIA,3,19\n" + + "BRAZIL,1,2\n" + + "BRAZIL,1,2\n" + + "SAUDI ARABIA,4,20\n" + + "SAUDI ARABIA,4,20\n" + + "VIETNAM,2,21\n" + + "VIETNAM,2,21\n" + + "RUSSIA,3,22\n" + + "RUSSIA,3,22\n" + + "UNITED KINGDOM,3,23\n" + + "UNITED KINGDOM,3,23\n" + + "UNITED STATES,1,24\n" + + "UNITED STATES,1,24\n" + + "CANADA,1,3\n" + + "CANADA,1,3\n" + + "EGYPT,4,4\n" + + "EGYPT,4,4\n" + + "ETHIOPIA,0,5\n" + + "ETHIOPIA,0,5\n" + + "FRANCE,3,6\n" + + "FRANCE,3,6\n" + + "GERMANY,3,7\n" + + "GERMANY,3,7\n" + + "INDIA,2,8\n" + + "INDIA,2,8\n" + + "INDONESIA,2,9\n" + + "INDONESIA,2,9\n"; + + assertEquals(expected, resultSetToString(res)); + + TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(), tableName); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri()); + FileSystem fs = path.getFileSystem(testingCluster.getConfiguration()); + + FileStatus[] files = fs.listStatus(path); + assertNotNull(files); + assertEquals(25, files.length); + + for (FileStatus eachFileStatus: files) { + assertTrue(eachFileStatus.getPath().getName().indexOf("n_nationkey=") == 0); + FileStatus[] dataFiles = fs.listStatus(eachFileStatus.getPath()); + assertEquals(2, dataFiles.length); + for (FileStatus eachDataFileStatus: dataFiles) { + String name = eachDataFileStatus.getPath().getName(); + assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*")); + } + } + } finally { + executeString("DROP TABLE " + tableName + " PURGE"); + } + } + + @Test + public final void testInsertOverwriteSmallerColumns() throws Exception { + ResultSet res = executeFile("table1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "table1")); + TableDesc originalDesc = catalog.getTableDesc(getCurrentDatabase(), "table1"); + + res = executeFile("testInsertOverwriteSmallerColumns.sql"); + res.close(); + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + assertEquals(originalDesc.getSchema(), desc.getSchema()); + + executeString("DROP TABLE table1 PURGE"); + } + + @Test + public final void testInsertOverwriteWithTargetColumns() throws Exception { + ResultSet res = executeFile("table1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "table1")); + TableDesc originalDesc = catalog.getTableDesc(getCurrentDatabase(), "table1"); + + res = executeFile("testInsertOverwriteWithTargetColumns.sql"); + res.close(); + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + CatalogUtil.denormalizeIdentifier(getCurrentDatabase()) + ".table1"); + + assertTrue(res.next()); + assertEquals(1, res.getLong(1)); + assertTrue(0f == res.getFloat(2)); + assertTrue(res.wasNull()); + assertTrue(17.0 == res.getFloat(3)); + + assertTrue(res.next()); + assertEquals(1, res.getLong(1)); + assertTrue(0f == res.getFloat(2)); + assertTrue(res.wasNull()); + assertTrue(36.0 == res.getFloat(3)); + + assertTrue(res.next()); + assertEquals(2, res.getLong(1)); + assertTrue(0f == res.getFloat(2)); + assertTrue(res.wasNull()); + assertTrue(38.0 == res.getFloat(3)); + + assertTrue(res.next()); + assertTrue(0f == res.getFloat(2)); + assertTrue(res.wasNull()); + assertTrue(45.0 == res.getFloat(3)); + + assertTrue(res.next()); + assertEquals(3, res.getLong(1)); + assertTrue(0f == res.getFloat(2)); + assertTrue(res.wasNull()); + assertTrue(49.0 == res.getFloat(3)); + + assertFalse(res.next()); + res.close(); + + assertEquals(originalDesc.getSchema(), desc.getSchema()); + + executeString("DROP TABLE table1 PURGE"); + } + + @Test + public final void testInsertOverwriteWithAsterisk() throws Exception { + ResultSet res = executeFile("full_table_csv_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "full_table_csv")); + + res = executeString("insert overwrite into full_table_csv select * from default.lineitem where l_orderkey = 3"); + res.close(); + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "full_table_csv"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(2, desc.getStats().getNumRows().intValue()); + } + executeString("DROP TABLE full_table_csv PURGE"); + } + + @Test + public final void testInsertOverwriteWithAsteriskAndMore() throws Exception { + ResultSet res = executeFile("lineitem_year_month_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "lineitem_year_month")); + + res = executeFile("load_to_lineitem_year_month.sql"); + res.close(); + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "lineitem_year_month"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + + res = executeQuery(); + assertResultSet(res); + res.close(); + + executeString("DROP TABLE lineitem_year_month PURGE"); + } + + @Test + public final void testInsertOverwriteIntoSelect() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("insertoverwriteintoselect"); + ResultSet res = executeString("create table " + tableName + " as select l_orderkey from default.lineitem"); + assertFalse(res.next()); + res.close(); + + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), tableName)); + TableDesc orderKeys = catalog.getTableDesc(getCurrentDatabase(), tableName); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, orderKeys.getStats().getNumRows().intValue()); + } + + // this query will result in the two rows. + res = executeString("insert overwrite into " + tableName + " select l_orderkey from default.lineitem where l_orderkey = 3"); + assertFalse(res.next()); + res.close(); + + assertTrue(catalog.existsTable(getCurrentDatabase(), tableName)); + orderKeys = catalog.getTableDesc(getCurrentDatabase(), tableName); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(2, orderKeys.getStats().getNumRows().intValue()); + } + executeString("DROP TABLE " + tableName + " PURGE"); + } + + @Test + public final void testInsertOverwriteCapitalTableName() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testInsertOverwriteCapitalTableName"); + ResultSet res = executeString("create table " + tableName + " as select * from default.lineitem"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), tableName)); + + res = executeString("insert overwrite into " + tableName + " select * from default.lineitem where l_orderkey = 3"); + res.close(); + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(2, desc.getStats().getNumRows().intValue()); + } + executeString("DROP TABLE " + tableName + " PURGE"); + } + + @Test + public final void testInsertOverwriteLocation() throws Exception { + ResultSet res = executeQuery(); + res.close(); + FileSystem fs = FileSystem.get(testingCluster.getConfiguration()); + assertTrue(fs.exists(new Path("/tajo-data/testInsertOverwriteCapitalTableName"))); + assertEquals(1, fs.listStatus(new Path("/tajo-data/testInsertOverwriteCapitalTableName")).length); + } + + @Test + public final void testInsertOverwriteWithCompression() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testInsertOverwriteWithCompression"); + ResultSet res = executeFile("testInsertOverwriteWithCompression_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), tableName)); + + res = executeQuery(); + res.close(); + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(2, desc.getStats().getNumRows().intValue()); + } + + FileSystem fs = FileSystem.get(testingCluster.getConfiguration()); + assertTrue(fs.exists(new Path(desc.getUri()))); + CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration()); + + for (FileStatus file : fs.listStatus(new Path(desc.getUri()))) { + CompressionCodec codec = factory.getCodec(file.getPath()); + assertTrue(codec instanceof DeflateCodec); + } + executeString("DROP TABLE " + tableName + " PURGE"); + } + + @Test + public final void testInsertOverwriteLocationWithCompression() throws Exception { + if (!testingCluster.isHiveCatalogStoreRunning()) { + ResultSet res = executeQuery(); + res.close(); + FileSystem fs = FileSystem.get(testingCluster.getConfiguration()); + Path path = new Path("/tajo-data/testInsertOverwriteLocationWithCompression"); + assertTrue(fs.exists(path)); + assertEquals(1, fs.listStatus(path).length); + + CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration()); + for (FileStatus file : fs.listStatus(path)){ + CompressionCodec codec = factory.getCodec(file.getPath()); + assertTrue(codec instanceof DeflateCodec); + } + } + } + + @Test + public final void testInsertOverwriteWithAsteriskUsingParquet() throws Exception { + if (!testingCluster.isHiveCatalogStoreRunning()) { + ResultSet res = executeFile("full_table_parquet_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "full_table_parquet")); + + res = executeString( + "insert overwrite into full_table_parquet select * from default.lineitem where l_orderkey = 3"); + res.close(); + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "full_table_parquet"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(2, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from full_table_parquet;"); + assertResultSet(res); + + res = executeString("select l_orderkey, l_partkey from full_table_parquet;"); + assertResultSet(res, "testInsertOverwriteWithAsteriskUsingParquet2.result"); + + executeString("DROP TABLE full_table_parquet PURGE"); + } + } + + @Test + public final void testInsertOverwriteIntoParquet() throws Exception { + if (!testingCluster.isHiveCatalogStoreRunning()) { + executeString("create table parquet_table " + + "(l_orderkey int4, l_shipdate text, l_shipdate_function text) using parquet").close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "parquet_table")); + + executeString( + "insert overwrite into parquet_table " + + "select l_orderkey, l_shipdate, substr(l_shipdate, 1, 10) from default.lineitem").close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "parquet_table"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + + ResultSet res = executeString("select l_orderkey, l_shipdate, l_shipdate_function " + + "from parquet_table "); + + String expected = "l_orderkey,l_shipdate,l_shipdate_function\n" + + "-------------------------------\n" + + "1,1996-03-13,1996-03-13\n" + + "1,1996-04-12,1996-04-12\n" + + "2,1997-01-28,1997-01-28\n" + + "3,1994-02-02,1994-02-02\n" + + "3,1993-11-09,1993-11-09\n"; + + assertEquals(expected, resultSetToString(res)); + + executeString("DROP TABLE parquet_table PURGE"); + } + } + + @Test + public final void testInsertOverwriteIntoPartitionedParquet() throws Exception { + if (!testingCluster.isHiveCatalogStoreRunning()) { + executeString("create table parquet_table " + + "(l_orderkey int4, l_shipdate_function text) using parquet partition by column (l_shipdate text)").close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "parquet_table")); + + executeString( + "insert overwrite into parquet_table " + + "select l_orderkey, substr(l_shipdate, 1, 10), l_shipdate from default.lineitem").close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "parquet_table"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + + ResultSet res = executeString("select l_orderkey, l_shipdate, l_shipdate_function " + + "from parquet_table "); + + String expected = "l_orderkey,l_shipdate,l_shipdate_function\n" + + "-------------------------------\n" + + "3,1993-11-09,1993-11-09\n" + + "3,1994-02-02,1994-02-02\n" + + "1,1996-03-13,1996-03-13\n" + + "1,1996-04-12,1996-04-12\n" + + "2,1997-01-28,1997-01-28\n"; + + assertEquals(expected, resultSetToString(res)); + + executeString("DROP TABLE parquet_table PURGE"); + } + } + + @Test + public final void testInsertOverwriteWithDatabase() throws Exception { + ResultSet res = executeFile("table1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "table1")); + + res = executeQuery(); + res.close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1"); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + executeString("DROP TABLE table1 PURGE"); + } + + @Test + public final void testInsertOverwriteTableWithNonFromQuery() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery"); + ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)"); + res.close(); + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), tableName)); + + res = executeString("insert overwrite into " + tableName + + " select 1::INT4, 2.1::FLOAT4, 'test'; "); + + res.close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(1, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + tableName + ";"); + assertTrue(res.next()); + + assertEquals(3, res.getMetaData().getColumnCount()); + assertEquals(1, res.getInt(1)); + assertEquals(2.1f, res.getFloat(2), 10); + assertEquals("test", res.getString(3)); + + res.close(); + executeString("DROP TABLE " + tableName + " PURGE"); + } + + @Test + public final void testInsertOverwriteTableWithNonFromQuery2() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery2"); + ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)"); + res.close(); + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), tableName)); + res = executeString("insert overwrite into " + tableName + " (col1, col3) select 1::INT4, 'test';"); + res.close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName); + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(1, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + tableName + ";"); + assertTrue(res.next()); + + assertEquals(3, res.getMetaData().getColumnCount()); + assertEquals(1, res.getInt(1)); + assertNull(res.getString(2)); + assertEquals(0.0, res.getDouble(2), 10); + assertEquals("test", res.getString(3)); + + res.close(); + executeString("DROP TABLE " + tableName + " PURGE"); + } + + @Test + public final void testInsertOverwritePathWithNonFromQuery() throws Exception { + ResultSet res = executeString("insert overwrite into location " + + "'/tajo-data/testInsertOverwritePathWithNonFromQuery' " + + "USING csv WITH ('text.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "select 1::INT4, 2.1::FLOAT4, 'test'"); + + res.close(); + FileSystem fs = FileSystem.get(testingCluster.getConfiguration()); + Path path = new Path("/tajo-data/testInsertOverwritePathWithNonFromQuery"); + assertTrue(fs.exists(path)); + assertEquals(1, fs.listStatus(path).length); + + CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration()); + FileStatus file = fs.listStatus(path)[0]; + CompressionCodec codec = factory.getCodec(file.getPath()); + assertTrue(codec instanceof DeflateCodec); + + BufferedReader reader = new BufferedReader( + new InputStreamReader(codec.createInputStream(fs.open(file.getPath())))); + + try { + String line = reader.readLine(); + assertNotNull(line); + + String[] tokens = line.split("\\|"); + + assertEquals(3, tokens.length); + assertEquals("1", tokens[0]); + assertEquals("2.1", tokens[1]); + assertEquals("test", tokens[2]); + } finally { + reader.close(); + } + } + + @Test + public final void testInsertOverwriteWithUnion() throws Exception { + ResultSet res = executeFile("table1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "table1")); + + res = executeFile("testInsertOverwriteWithUnion.sql"); + res.close(); + + String tableDatas = getTableFileContents("table1"); + + String expected = "1|1|17.0\n" + + "1|1|36.0\n" + + "2|2|38.0\n" + + "3|2|45.0\n" + + "3|3|49.0\n" + + "1|3|173665.47\n" + + "2|4|46929.18\n" + + "3|2|193846.25\n"; + + assertNotNull(tableDatas); + assertEquals(expected, tableDatas); + + executeString("DROP TABLE table1 PURGE"); + } + + @Test + public final void testInsertOverwriteWithUnionDifferentAlias() throws Exception { + ResultSet res = executeFile("table1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "table1")); + + res = executeFile("testInsertOverwriteWithUnionDifferentAlias.sql"); + res.close(); + + String tableDatas = getTableFileContents("table1"); + + String expected = "1|1|17.0\n" + + "1|1|36.0\n" + + "2|2|38.0\n" + + "3|2|45.0\n" + + "3|3|49.0\n" + + "1|3|173665.47\n" + + "2|4|46929.18\n" + + "3|2|193846.25\n"; + + assertNotNull(tableDatas); + assertEquals(expected, tableDatas); + + executeString("DROP TABLE table1 PURGE"); + } + + @Test + public final void testInsertOverwriteLocationWithUnion() throws Exception { + ResultSet res = executeFile("testInsertOverwriteLocationWithUnion.sql"); + res.close(); + + String resultDatas= getTableFileContents(new Path("/tajo-data/testInsertOverwriteLocationWithUnion")); + + String expected = "1|1|17.0\n" + + "1|1|36.0\n" + + "2|2|38.0\n" + + "3|2|45.0\n" + + "3|3|49.0\n" + + "1|3|173665.47\n" + + "2|4|46929.18\n" + + "3|2|193846.25\n"; + + assertNotNull(resultDatas); + assertEquals(expected, resultDatas); + } + + @Test + public final void testInsertOverwriteLocationWithUnionDifferenceAlias() throws Exception { + ResultSet res = executeFile("testInsertOverwriteLocationWithUnionDifferenceAlias.sql"); + res.close(); + + String resultDatas= getTableFileContents(new Path("/tajo-data/testInsertOverwriteLocationWithUnionDifferenceAlias")); + + String expected = "1|1|17.0\n" + + "1|1|36.0\n" + + "2|2|38.0\n" + + "3|2|45.0\n" + + "3|3|49.0\n" + + "1|3|173665.47\n" + + "2|4|46929.18\n" + + "3|2|193846.25\n"; + + assertNotNull(resultDatas); + assertEquals(expected, resultDatas); + } + + @Test + public final void testInsertWithDifferentColumnOrder() throws Exception { + ResultSet res = executeFile("nation_diff_col_order.ddl"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "nation_diff")); + + try { + res = executeFile("testInsertWithDifferentColumnOrder.sql"); + res.close(); + + res = executeString("select * from nation_diff"); + assertResultSet(res); + } finally { + executeString("drop table nation_diff purge;"); + } + } + + @Test + public final void testFixedCharSelectWithNoLength() throws Exception { + ResultSet res = executeFile("test1_nolength_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "test1")); + + res = executeFile("testInsertIntoSelectWithFixedSizeCharWithNoLength.sql"); + res.close(); + + //remove \0 + String resultDatas = getTableFileContents("test1").replaceAll("\0",""); + String expected = "a\n"; + + assertNotNull(resultDatas); + assertEquals(expected.length(), resultDatas.length()); + assertEquals(expected, resultDatas); + executeString("DROP TABLE test1 PURGE"); + } + + @Test + public final void testFixedCharSelect() throws Exception { + ResultSet res = executeFile("test1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "test1")); + + res = executeFile("testInsertIntoSelectWithFixedSizeChar.sql"); + res.close(); + + //remove \0 + String resultDatas = getTableFileContents("test1").replaceAll("\0",""); + String expected = "a\n" + + "abc\n" + + "abcde\n"; + + assertNotNull(resultDatas); + assertEquals(expected.length(), resultDatas.length()); + assertEquals(expected, resultDatas); + executeString("DROP TABLE test1 PURGE"); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java new file mode 100644 index 0000000..d2585a7 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.NamedTest; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.annotation.NotThreadSafe; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.CatalogUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/* + * NOTE: Plan tests are disabled in TestJoinOnPartitionedTables. + * A plan reading partitioned table currently contains HDFS paths to input partitions. + * An example form of path to an input partition is hdfs://localhost:60305/tajo/warehouse/default/customer_parts/c_nationkey=1. + * Here, the different HDFS port is used for each test run, it is difficult to test query plans that read partitioned table. + */ +@Category(IntegrationTest.class) +@RunWith(Parameterized.class) +@NamedTest("TestJoinQuery") +@NotThreadSafe +public class TestJoinOnPartitionedTables extends TestJoinQuery { + + public TestJoinOnPartitionedTables(String joinOption) throws Exception { + super(joinOption); + } + + @BeforeClass + public static void setup() throws Exception { + TestJoinQuery.setup(); + client.executeQuery("CREATE TABLE if not exists customer_parts " + + "(c_custkey INT4, c_name TEXT, c_address TEXT, c_phone TEXT, c_acctbal FLOAT8, c_mktsegment TEXT, c_comment TEXT) " + + "PARTITION BY COLUMN (c_nationkey INT4) as " + + "SELECT c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey FROM customer;"); + client.executeQueryAndGetResult("create table if not exists nation_partitioned (n_name text) " + + "partition by column(n_nationkey int4, n_regionkey int4) " + + "as select n_name, n_nationkey, n_regionkey from nation"); + addEmptyDataFile("nation_partitioned", true); + } + + @AfterClass + public static void classTearDown() throws SQLException { + TestJoinQuery.classTearDown(); + client.executeQuery("DROP TABLE IF EXISTS customer_parts PURGE"); + client.executeQuery("DROP TABLE IF EXISTS nation_partitioned PURGE"); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest() + public void testPartitionTableJoinSmallTable() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest() + public void testNoProjectionJoinQual() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest() + public void testPartialFilterPushDown() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest() + public void testPartialFilterPushDownOuterJoin() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest() + public void testPartialFilterPushDownOuterJoin2() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest() + public void selfJoinOfPartitionedTable() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest(queries = { + @QuerySpec("select a.c_custkey, b.c_custkey from " + + " (select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " + + " union all " + + " select c_custkey, c_nationkey from customer_parts where c_nationkey < 0 " + + ") a " + + "left outer join customer_parts b " + + "on a.c_custkey = b.c_custkey " + + "and a.c_nationkey > 0") + }) + public void testPartitionMultiplePartitionFilter() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest() + public void testFilterPushDownPartitionColumnCaseWhen() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public void testMultiplePartitionedBroadcastDataFileWithZeroLength() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true) + @SimpleTest() + public void testMultiplePartitionedBroadcastDataFileWithZeroLength2() throws Exception { + runSimpleTests(); + } + + @Test + public final void testCasebyCase1() throws Exception { + // Left outer join with a small table and a large partition table which not matched any partition path. + String tableName = CatalogUtil.normalizeIdentifier("largePartitionedTable"); + executeString( + "create table " + tableName + " (l_partkey int4, l_suppkey int4, l_linenumber int4, \n" + + "l_quantity float8, l_extendedprice float8, l_discount float8, l_tax float8, \n" + + "l_returnflag text, l_linestatus text, l_shipdate text, l_commitdate text, \n" + + "l_receiptdate text, l_shipinstruct text, l_shipmode text, l_comment text) \n" + + "partition by column(l_orderkey int4) ").close(); + + try { + executeString("insert overwrite into " + tableName + + " select l_partkey, l_suppkey, l_linenumber, \n" + + " l_quantity, l_extendedprice, l_discount, l_tax, \n" + + " l_returnflag, l_linestatus, l_shipdate, l_commitdate, \n" + + " l_receiptdate, l_shipinstruct, l_shipmode, l_comment, l_orderkey from lineitem"); + + ResultSet res = executeString( + "select a.l_orderkey as key1, b.l_orderkey as key2 from lineitem as a " + + "left outer join " + tableName + " b " + + "on a.l_partkey = b.l_partkey and b.l_orderkey = 1000" + ); + + String expected = "key1,key2\n" + + "-------------------------------\n" + + "1,null\n" + + "1,null\n" + + "2,null\n" + + "3,null\n" + + "3,null\n"; + assertEquals(expected, resultSetToString(res)); + cleanupQuery(res); + } finally { + executeString("drop table " + tableName + " purge"); + } + } + + // TODO: This test should be reverted after resolving TAJO-1600 +// @Test + public final void testBroadcastMultiColumnPartitionTable() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testBroadcastMultiColumnPartitionTable"); + ResultSet res = testBase.execute( + "create table " + tableName + " (col1 int4, col2 float4) partition by column(col3 text, col4 text) "); + res.close(); + TajoTestingCluster cluster = testBase.getTestingCluster(); + CatalogService catalog = cluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + try { + res = executeString("insert overwrite into " + tableName + + " select o_orderkey, o_totalprice, substr(o_orderdate, 6, 2), substr(o_orderdate, 1, 4) from orders"); + res.close(); + + res = executeString( + "select distinct a.col3 from " + tableName + " as a " + + "left outer join lineitem b " + + "on a.col1 = b.l_orderkey order by a.col3" + ); + + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("drop table " + tableName + " purge"); + } + } + + @Test + public final void testSelfJoin() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation"); + ResultSet res = executeString( + "create table " + tableName + " (n_name text," + + " n_comment text, n_regionkey int8) USING text " + + "WITH ('text.delimiter'='|')" + + "PARTITION BY column(n_nationkey int8)"); + res.close(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + try { + res = executeString( + "insert overwrite into " + tableName + + " select n_name, n_comment, n_regionkey, n_nationkey from nation"); + res.close(); + + res = executeString( + "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey" + + " where a.n_nationkey in (1)"); + String expected = resultSetToString(res); + res.close(); + + res = executeString( + "select a.n_nationkey, a.n_name from " + tableName + " a join " + tableName + + " b on a.n_nationkey = b.n_nationkey " + + " where a.n_nationkey in (1)"); + String resultSetData = resultSetToString(res); + res.close(); + + assertEquals(expected, resultSetData); + cleanupQuery(res); + } finally { + executeString("drop table " + tableName + " purge"); + } + } + + @Test + public final void testSelfJoin2() throws Exception { + /* + https://issues.apache.org/jira/browse/TAJO-1102 + See the following case. + CREATE TABLE orders_partition + (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT, + o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING TEXT WITH ('text.delimiter'='|') + PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT); + + select a.o_orderstatus, count(*) as cnt + from orders_partition a + inner join orders_partition b + on a.o_orderdate = b.o_orderdate + and a.o_orderstatus = b.o_orderstatus + and a.o_orderkey = b.o_orderkey + where a.o_orderdate='1995-02-21' + and a.o_orderstatus in ('F') + group by a.o_orderstatus; + + Because of the where condition[where a.o_orderdate='1995-02-21 and a.o_orderstatus in ('F')], + orders_partition table aliased a is small and broadcast target. + */ + String tableName = CatalogUtil.normalizeIdentifier("partitioned_orders"); + ResultSet res = executeString( + "create table " + tableName + " (o_orderkey INT8, o_custkey INT8, o_totalprice FLOAT8, o_orderpriority TEXT,\n" + + "o_clerk TEXT, o_shippriority INT4, o_comment TEXT) USING TEXT WITH ('text.delimiter'='|')\n" + + "PARTITION BY COLUMN(o_orderdate TEXT, o_orderstatus TEXT, o_orderkey_mod INT8)"); + res.close(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + try { + res = executeString( + "insert overwrite into " + tableName + + " select o_orderkey, o_custkey, o_totalprice, " + + " o_orderpriority, o_clerk, o_shippriority, o_comment, o_orderdate, o_orderstatus, o_orderkey % 10 " + + " from orders "); + res.close(); + + res = executeString( + "select a.o_orderdate, a.o_orderstatus, a.o_orderkey % 10 as o_orderkey_mod, a.o_totalprice " + + "from orders a " + + "join orders b on a.o_orderkey = b.o_orderkey " + + "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey % 10 = 1" + + " order by a.o_orderkey" + ); + String expected = resultSetToString(res); + res.close(); + + res = executeString( + "select a.o_orderdate, a.o_orderstatus, a.o_orderkey_mod, a.o_totalprice " + + "from " + tableName + + " a join " + tableName + " b on a.o_orderkey = b.o_orderkey " + + "where a.o_orderdate = '1993-10-14' and a.o_orderstatus = 'F' and a.o_orderkey_mod = 1 " + + " order by a.o_orderkey" + ); + String resultSetData = resultSetToString(res); + res.close(); + + cleanupQuery(res); + assertEquals(expected, resultSetData); + } finally { + executeString("drop table " + tableName + " purge"); + } + } + + @Test + @Option(withExplain = false, withExplainGlobal = false, parameterized = true) + @SimpleTest() + public final void testBroadcastPartitionTable() throws Exception { + // If all tables participate in the BROADCAST JOIN, there is some missing data. + executeDDL("customer_partition_ddl.sql", null); + ResultSet res = executeFile("insert_into_customer_partition.sql"); + res.close(); + + try { + runSimpleTests(); + } finally { + executeString("DROP TABLE customer_broad_parts PURGE"); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java new file mode 100644 index 0000000..2fddbfa --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.Int4Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.storage.*; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestJoinQuery extends QueryTestCaseBase { + private static final Log LOG = LogFactory.getLog(TestJoinQuery.class); + private static int reference = 0; + + public TestJoinQuery(String joinOption) throws Exception { + super(TajoConstants.DEFAULT_DATABASE_NAME, joinOption); + + testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true"); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "" + (5 * 1024)); + + testingCluster.setAllTajoDaemonConfValue( + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal); + + testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname, + ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal); + + if (joinOption.indexOf("NoBroadcast") >= 0) { + testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false"); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, "-1"); + } + + if (joinOption.indexOf("Hash") >= 0) { + testingCluster.setAllTajoDaemonConfValue( + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256 * 1048576)); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, + String.valueOf(256 * 1048576)); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname, + String.valueOf(256 * 1048576)); + } + if (joinOption.indexOf("Sort") >= 0) { + testingCluster.setAllTajoDaemonConfValue( + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(1)); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, + String.valueOf(1)); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname, + String.valueOf(1)); + } + } + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][]{ + {"Hash_NoBroadcast"}, + {"Sort_NoBroadcast"}, + {"Hash"}, + {"Sort"}, + }); + } + + public static void setup() throws Exception { + if (reference++ == 0) { + createCommonTables(); + } + } + + public static void classTearDown() throws SQLException { + testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, + ConfVars.$TEST_BROADCAST_JOIN_ENABLED.defaultVal); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname, + ConfVars.$DIST_QUERY_BROADCAST_JOIN_THRESHOLD.defaultVal); + + testingCluster.setAllTajoDaemonConfValue( + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal); + + testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.defaultVal); + testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname, + ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal); + + if (--reference == 0) { + dropCommonTables(); + } + } + + protected static void createCommonTables() throws Exception { + LOG.info("Create common tables for join tests"); + + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", TajoDataTypes.Type.INT4); + schema.addColumn("name", TajoDataTypes.Type.TEXT); + String[] data = new String[]{"1|table11-1", "2|table11-2", "3|table11-3", "4|table11-4", "5|table11-5"}; + TajoTestingCluster.createTable("jointable11", schema, tableOptions, data, 2); + + schema = new Schema(); + schema.addColumn("id", TajoDataTypes.Type.INT4); + schema.addColumn("name", TajoDataTypes.Type.TEXT); + data = new String[]{"1|table12-1", "2|table12-2"}; + TajoTestingCluster.createTable("jointable12", schema, tableOptions, data, 2); + + schema = new Schema(); + schema.addColumn("id", TajoDataTypes.Type.INT4); + schema.addColumn("name", TajoDataTypes.Type.TEXT); + data = new String[]{"2|table13-2", "3|table13-3"}; + TajoTestingCluster.createTable("jointable13", schema, tableOptions, data); + + schema = new Schema(); + schema.addColumn("id", TajoDataTypes.Type.INT4); + schema.addColumn("name", TajoDataTypes.Type.TEXT); + data = new String[]{"1|table14-1", "2|table14-2", "3|table14-3", "4|table14-4"}; + TajoTestingCluster.createTable("jointable14", schema, tableOptions, data); + + schema = new Schema(); + schema.addColumn("id", TajoDataTypes.Type.INT4); + schema.addColumn("name", TajoDataTypes.Type.TEXT); + data = new String[]{}; + TajoTestingCluster.createTable("jointable15", schema, tableOptions, data); + + schema = new Schema(); + schema.addColumn("id", TajoDataTypes.Type.INT4); + schema.addColumn("name", TajoDataTypes.Type.TEXT); + data = new String[]{"1000000|a", "1000001|b", "2|c", "3|d", "4|e"}; + TajoTestingCluster.createTable("jointable1", schema, tableOptions, data, 1); + + data = new String[10000]; + for (int i = 0; i < data.length; i++) { + data[i] = i + "|" + "this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable" + i; + } + TajoTestingCluster.createTable("jointable_large", schema, tableOptions, data, 2); + + // According to node type(leaf or non-leaf) Broadcast join is determined differently by Repartitioner. + // testMultipleBroadcastDataFileWithZeroLength testcase is for the leaf node + createMultiFile("nation", 2, new TupleCreator() { + public Tuple createTuple(String[] columnDatas) { + return new VTuple(new Datum[]{ + new Int4Datum(Integer.parseInt(columnDatas[0])), + new TextDatum(columnDatas[1]), + new Int4Datum(Integer.parseInt(columnDatas[2])), + new TextDatum(columnDatas[3]) + }); + } + }); + addEmptyDataFile("nation_multifile", false); + } + + protected static void dropCommonTables() throws SQLException { + LOG.info("Clear common tables for join tests"); + + client.executeQuery("DROP TABLE IF EXISTS jointable11 PURGE;"); + client.executeQuery("DROP TABLE IF EXISTS jointable12 PURGE;"); + client.executeQuery("DROP TABLE IF EXISTS jointable13 PURGE;"); + client.executeQuery("DROP TABLE IF EXISTS jointable14 PURGE;"); + client.executeQuery("DROP TABLE IF EXISTS jointable15 PURGE;"); + client.executeQuery("DROP TABLE IF EXISTS jointable1 PURGE"); + client.executeQuery("DROP TABLE IF EXISTS jointable_large PURGE"); + client.executeQuery("DROP TABLE IF EXISTS nation_multifile PURGE"); + } + + interface TupleCreator { + Tuple createTuple(String[] columnDatas); + } + + private static String buildSchemaString(String tableName) throws TajoException { + TableDesc desc = client.getTableDesc(tableName); + StringBuffer sb = new StringBuffer(); + for (Column column : desc.getSchema().getRootColumns()) { + sb.append(column.getSimpleName()).append(" ").append(column.getDataType().getType()); + TajoDataTypes.DataType dataType = column.getDataType(); + if (dataType.getLength() > 0) { + sb.append("(").append(dataType.getLength()).append(")"); + } + sb.append(","); + } + sb.deleteCharAt(sb.length()-1); + return sb.toString(); + } + + private static String buildMultifileDDlString(String tableName) throws TajoException { + String multiTableName = tableName + "_multifile"; + StringBuilder sb = new StringBuilder("create table ").append(multiTableName).append(" ("); + sb.append(buildSchemaString(tableName)).append(" )"); + return sb.toString(); + } + + protected static void createMultiFile(String tableName, int numRowsEachFile, TupleCreator tupleCreator) throws Exception { + // make multiple small file + String multiTableName = tableName + "_multifile"; + String sql = buildMultifileDDlString(tableName); + client.executeQueryAndGetResult(sql); + + TableDesc table = client.getTableDesc(multiTableName); + assertNotNull(table); + + TableMeta tableMeta = table.getMeta(); + Schema schema = table.getLogicalSchema(); + + File file = new File("src/test/tpch/" + tableName + ".tbl"); + + if (!file.exists()) { + file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + tableName + ".tbl"); + } + String[] rows = FileUtil.readTextFile(file).split("\n"); + + assertTrue(rows.length > 0); + + int fileIndex = 0; + + Appender appender = null; + for (int i = 0; i < rows.length; i++) { + if (i % numRowsEachFile == 0) { + if (appender != null) { + appender.flush(); + appender.close(); + } + Path dataPath = new Path(table.getUri().toString(), fileIndex + ".csv"); + fileIndex++; + appender = (((FileTablespace) TablespaceManager.getLocalFs())) + .getAppender(tableMeta, schema, dataPath); + appender.init(); + } + String[] columnDatas = rows[i].split("\\|"); + Tuple tuple = tupleCreator.createTuple(columnDatas); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + } + + protected static void addEmptyDataFile(String tableName, boolean isPartitioned) throws Exception { + TableDesc table = client.getTableDesc(tableName); + + Path path = new Path(table.getUri()); + FileSystem fs = path.getFileSystem(conf); + if (isPartitioned) { + List<Path> partitionPathList = getPartitionPathList(fs, path); + for (Path eachPath: partitionPathList) { + Path dataPath = new Path(eachPath, 0 + "_empty.csv"); + OutputStream out = fs.create(dataPath); + out.close(); + } + } else { + Path dataPath = new Path(path, 0 + "_empty.csv"); + OutputStream out = fs.create(dataPath); + out.close(); + } + } + + protected static List<Path> getPartitionPathList(FileSystem fs, Path path) throws Exception { + FileStatus[] files = fs.listStatus(path); + List<Path> paths = new ArrayList<Path>(); + if (files != null) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + paths.add(path); + return paths; + } else { + paths.addAll(getPartitionPathList(fs, eachFile.getPath())); + } + } + } + + return paths; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java new file mode 100644 index 0000000..d3cde3d --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.NamedTest; +import org.apache.tajo.QueryTestCaseBase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.SQLException; + +@Category(IntegrationTest.class) +@RunWith(Parameterized.class) +@NamedTest("TestJoinQuery") +public class TestMultipleJoinTypes extends TestJoinQuery { + + public TestMultipleJoinTypes(String joinOption) throws Exception { + super(joinOption); + } + + @BeforeClass + public static void setup() throws Exception { + TestJoinQuery.setup(); + } + + @AfterClass + public static void classTearDown() throws SQLException { + TestJoinQuery.classTearDown(); + } + + @Test + @QueryTestCaseBase.Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @QueryTestCaseBase.SimpleTest() + public final void testJoinWithMultipleJoinTypes() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testComplexJoinsWithCaseWhen() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest() + public void testComplexJoinsWithCaseWhen2() throws Exception { + runSimpleTests(); + } + + @Test + @Option(withExplain = true, withExplainGlobal = true, parameterized = true) + @SimpleTest(prepare = { + "CREATE TABLE customer_broad_parts (" + + " c_nationkey INT4," + + " c_name TEXT," + + " c_address TEXT," + + " c_phone TEXT," + + " c_acctbal FLOAT8," + + " c_mktsegment TEXT," + + " c_comment TEXT" + + ") PARTITION BY COLUMN (c_custkey INT4)", + "INSERT OVERWRITE INTO customer_broad_parts" + + " SELECT" + + " c_nationkey," + + " c_name," + + " c_address," + + " c_phone," + + " c_acctbal," + + " c_mktsegment," + + " c_comment," + + " c_custkey" + + " FROM customer" + }, cleanup = { + "DROP TABLE customer_broad_parts PURGE" + }, queries = { + @QuerySpec("select a.l_orderkey, b.o_orderkey, c.c_custkey from lineitem a " + + "inner join orders b on a.l_orderkey = b.o_orderkey " + + "left outer join customer_broad_parts c on a.l_orderkey = c.c_custkey and c.c_custkey < 0") + }) + public final void testInnerAndOuterWithEmpty() throws Exception { + runSimpleTests(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java new file mode 100644 index 0000000..bd8f830 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.QueryTestCaseBase; +import org.junit.Before; +import org.junit.Test; + +import java.sql.ResultSet; + +public class TestNetTypes extends QueryTestCaseBase { + + @Before + public final void setUp() throws Exception { + if (!testingCluster.isHiveCatalogStoreRunning()) { + executeDDL("table1_ddl.sql", "table1"); + executeDDL("table2_ddl.sql", "table2"); + } + } + + @Test + public final void testSelect() throws Exception { + // Skip all tests when HiveCatalogStore is used. + if (!testingCluster.isHiveCatalogStoreRunning()) { + // select name, addr from table1; + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + } + + @Test + public final void testGroupby() throws Exception { + // Skip all tests when HiveCatalogStore is used. + if (!testingCluster.isHiveCatalogStoreRunning()) { + // select name, addr, count(1) from table1 group by name, addr; + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + } + + @Test + public final void testGroupby2() throws Exception { + // Skip all tests when HiveCatalogStore is used. + if (!testingCluster.isHiveCatalogStoreRunning()) { + // select addr, count(*) from table1 group by addr; + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + } + + @Test + public final void testSort() throws Exception { + // Skip all tests when HiveCatalogStore is used. + if (!testingCluster.isHiveCatalogStoreRunning()) { + // select * from table1 order by addr; + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + } + + @Test + public final void testSort2() throws Exception { + // Skip all tests when HiveCatalogStore is used. + if (!testingCluster.isHiveCatalogStoreRunning()) { + // select addr from table2 order by addr; + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + } + + @Test + public final void testJoin() throws Exception { + // Skip all tests when HiveCatalogStore is used. + if (!testingCluster.isHiveCatalogStoreRunning()) { + // select * from table1 as t1, table2 as t2 where t1.addr = t2.addr; + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java new file mode 100644 index 0000000..66848e6 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestNullValues.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.TpchTestBase; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.KeyValueSet; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import static org.junit.Assert.*; + +/** + * This is the unit test for null values. This test needs specialized data sets. + * So, We separated it from other unit tests using TPC-H data set. + */ +@Category(IntegrationTest.class) +public class TestNullValues { + + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + client = TpchTestBase.getInstance().getTestingCluster().newTajoClient(); + } + + @AfterClass + public static void tearDown() { + client.close(); + } + + @Test + public final void testIsNull() throws Exception { + String [] table = new String[] {"nulltable1"}; + Schema schema = new Schema(); + schema.addColumn("col1", Type.INT4); + schema.addColumn("col2", Type.TEXT); + schema.addColumn("col3", Type.FLOAT4); + Schema [] schemas = new Schema[] {schema}; + String [] data = { + "1|filled|0.1", + "2||", + "3|filled|0.2" + }; + KeyValueSet opts = new KeyValueSet(); + opts.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + ResultSet res = TajoTestingCluster + .run(table, schemas, opts, new String[][]{data}, + "select * from nulltable1 where col3 is null", client); + + try { + assertTrue(res.next()); + assertEquals(2, res.getInt(1)); + assertFalse(res.next()); + } finally { + res.close(); + } + } + + @Test + public final void testIsNotNull() throws Exception { + String [] table = new String[] {"nulltable2"}; + Schema schema = new Schema(); + schema.addColumn("col1", Type.INT4); + schema.addColumn("col2", Type.TEXT); + Schema [] schemas = new Schema[] {schema}; + String [] data = { + "1|filled|", + "||", + "3|filled|" + }; + KeyValueSet opts = new KeyValueSet(); + opts.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + ResultSet res = TajoTestingCluster + .run(table, schemas, opts, new String[][]{data}, + "select * from nulltable2 where col1 is not null", client); + try { + assertTrue(res.next()); + assertEquals(1, res.getInt(1)); + assertTrue(res.next()); + assertEquals(3, res.getInt(1)); + assertFalse(res.next()); + } finally { + res.close(); + } + } + + @Test + public final void testIsNotNull2() throws Exception { + String [] table = new String[] {"nulltable3"}; + Schema schema = new Schema(); + schema.addColumn("col1", Type.INT8); + schema.addColumn("col2", Type.INT8); + schema.addColumn("col3", Type.INT8); + schema.addColumn("col4", Type.INT8); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.INT8); + schema.addColumn("col8", Type.INT8); + schema.addColumn("col9", Type.INT8); + schema.addColumn("col10", Type.INT8); + Schema [] schemas = new Schema[] {schema}; + String [] data = { + ",,,,672287821,1301460,1,313895860387,126288907,1024", + ",,,43578,19,13,6,3581,2557,1024" + }; + KeyValueSet opts = new KeyValueSet(); + opts.set(StorageConstants.TEXT_DELIMITER, ","); + ResultSet res = TajoTestingCluster + .run(table, schemas, opts, new String[][]{data}, + "select * from nulltable3 where col1 is null and col2 is null and col3 is null and col4 = 43578", client); + try { + assertTrue(res.next()); + assertEquals(43578, res.getLong(4)); + assertFalse(res.next()); + } finally { + res.close(); + } + } + + @Test + public final void testIsNotNull3() throws Exception { + String [] table = new String[] {"nulltable4"}; + Schema schema = new Schema(); + schema.addColumn("col1", Type.INT8); + schema.addColumn("col2", Type.INT8); + schema.addColumn("col3", Type.INT8); + schema.addColumn("col4", Type.INT8); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.INT8); + schema.addColumn("col8", Type.INT8); + schema.addColumn("col9", Type.INT8); + schema.addColumn("col10", Type.INT8); + Schema [] schemas = new Schema[] {schema}; + String [] data = { + "\\N,,,,672287821,", + ",\\N,,43578" + }; + KeyValueSet opts = new KeyValueSet(); + opts.set(StorageConstants.TEXT_DELIMITER, ","); + opts.set(StorageConstants.TEXT_NULL, "\\\\N"); + ResultSet res = TajoTestingCluster + .run(table, schemas, opts, new String[][]{data}, + "select * from nulltable4 where col1 is null and col2 is null and col3 is null and col5 is null and col4 = 43578" + , client); + try { + assertTrue(res.next()); + assertEquals(43578, res.getLong(4)); + assertFalse(res.next()); + } finally { + res.close(); + } + } + + @Test + public final void testResultSetNullSimpleQuery() throws Exception { + String tableName = "nulltable5"; + ResultSet res = null; + + try { + res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName, client); + int numRows = 0; + + String expected = + "null|a|1.0|true\n" + + "2|null|2.0|false\n" + + "3|c|null|true\n" + + "4|d|4.0|null"; + + String result = ""; + + String prefix = ""; + while(res.next()) { + for (int i = 0; i < 4; i++) { + result += prefix + res.getObject(i + 1); + prefix = "|"; + } + prefix = "\n"; + + assertResultSetNull(res, numRows, false, new int[]{1,2,3,4}); + assertResultSetNull(res, numRows, true, new int[]{1,2,3,4}); + numRows++; + } + assertEquals(4, numRows); + assertEquals(expected, result); + } finally { + if (res != null) { + res.close(); + } + } + } + + @Test + public final void testResultSetNull() throws Exception { + String tableName = "nulltable6"; + String query = "select " + + "col1, coalesce(col1, 99999), " + + "col2, coalesce(col2, 'null_value'), " + + "col3, coalesce(col3, 99999.0)," + + "col4 " + + "from " + tableName; + + ResultSet res = null; + + try { + res = runNullTableQuery(tableName, query, client); + int numRows = 0; + String expected = + "null|99999|a|a|1.0|1.0|true\n" + + "2|2|null|null_value|2.0|2.0|false\n" + + "3|3|c|c|null|99999.0|true\n" + + "4|4|d|d|4.0|4.0|null"; + + String result = ""; + + String prefix = ""; + while(res.next()) { + for (int i = 0; i < 7; i++) { + result += prefix + res.getObject(i + 1); + prefix = "|"; + } + prefix = "\n"; + + assertResultSetNull(res, numRows, false, new int[]{1,3,5,7}); + assertResultSetNull(res, numRows, true, new int[]{1,3,5,7}); + numRows++; + } + assertEquals(4, numRows); + assertEquals(expected, result); + } finally { + if (res != null) { + res.close(); + } + } + } + + private ResultSet runNullTableQuery(String tableName, String query, TajoClient client) throws Exception { + String [] table = new String[] {tableName}; + Schema schema = new Schema(); + schema.addColumn("col1", Type.INT4); + schema.addColumn("col2", Type.TEXT); + schema.addColumn("col3", Type.FLOAT4); + schema.addColumn("col4", Type.BOOLEAN); + Schema [] schemas = new Schema[] {schema}; + String [] data = { + "\\N|a|1.0|t", + "2|\\N|2.0|f", + "3|c|\\N|t", + "4|d|4.0|\\N" + }; + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); + + if (client == null) { + return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query); + } else { + return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query, client); + } + } + + private void assertResultSetNull(ResultSet res, int numRows, boolean useName, int[] nullIndex) throws SQLException { + if (numRows == 0) { + if (useName) { + assertEquals(0, res.getInt(res.getMetaData().getColumnName(nullIndex[numRows]))); + } else { + assertEquals(0, res.getInt(nullIndex[numRows])); + } + } + + if (numRows == 1) { + if (useName) { + assertNull(res.getString(res.getMetaData().getColumnName(nullIndex[numRows]))); + } else { + assertNull(res.getString(nullIndex[numRows])); + }; + } + + if (numRows == 2) { + if (useName) { + assertEquals(0.0, res.getDouble(res.getMetaData().getColumnName(nullIndex[numRows])), 10); + } else { + assertEquals(0.0, res.getDouble(nullIndex[numRows]), 10); + } + } + + if (numRows == 3) { + if (useName) { + assertEquals(false, res.getBoolean(res.getMetaData().getColumnName(nullIndex[numRows]))); + } else { + assertEquals(false, res.getBoolean(nullIndex[numRows])); + } + } + } +}
