Repository: tajo Updated Branches: refs/heads/master 9acb3a63c -> b50831ffa
TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa) Closes #478 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b50831ff Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b50831ff Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b50831ff Branch: refs/heads/master Commit: b50831ffa66d94b67a419a95f81beed8fcab8ba2 Parents: 9acb3a6 Author: JaeHwa Jung <[email protected]> Authored: Mon Apr 27 18:27:08 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Mon Apr 27 18:27:08 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../engine/planner/PhysicalPlannerImpl.java | 9 + .../engine/planner/global/GlobalPlanner.java | 26 +- .../tajo/engine/query/TestTablePartitions.java | 455 +++++++++++++------ 4 files changed, 347 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/b50831ff/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e3aec57..b6488c4 100644 --- a/CHANGES +++ b/CHANGES @@ -22,6 +22,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa) + TAJO-1548: Refactoring condition code for CHAR into CatalogUtil. (Contributed by DaeMyung Kang, Committed by jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/b50831ff/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index ac1c9ad..f132793 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; @@ -854,6 +855,14 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { sortSpecs[i++] = new SortSpec(insertNode.getProjectedSchema().getColumn(id), true, false); } } + } else if (storeTableNode.getType() == NodeType.CREATE_TABLE) { + int i = 0; + for (int j = 0; j < partitionKeyColumns.length; j++) { + int id = storeTableNode.getOutSchema().getColumns().size() + j; + Column column = storeTableNode.getInSchema().getColumn(id); + sortSpecs[i++] = new SortSpec(column, true, false); + } + } else { for (int i = 0; i < partitionKeyColumns.length; i++) { sortSpecs[i] = new SortSpec(partitionKeyColumns[i], true, false); http://git-wip-us.apache.org/repos/asf/tajo/blob/b50831ff/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index cd35d96..54b920f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -1132,14 +1132,26 @@ public class GlobalPlanner { Preconditions.checkState(node.hasTargetTable(), "A target table must be a partitioned table."); PartitionMethodDesc partitionMethod = node.getPartitionMethod(); - if (node.getType() == NodeType.INSERT) { - InsertNode insertNode = (InsertNode) node; - channel.setSchema(((InsertNode)node).getProjectedSchema()); - Column [] shuffleKeys = new Column[partitionMethod.getExpressionSchema().size()]; - int i = 0; + if (node.getType() == NodeType.INSERT || node.getType() == NodeType.CREATE_TABLE) { + Schema tableSchema = null, projectedSchema = null; + if (node.getType() == NodeType.INSERT) { + tableSchema = ((InsertNode) node).getTableSchema(); + projectedSchema = ((InsertNode) node).getProjectedSchema(); + } else { + tableSchema = node.getOutSchema(); + projectedSchema = node.getInSchema(); + } + channel.setSchema(projectedSchema); + + Column[] shuffleKeys = new Column[partitionMethod.getExpressionSchema().size()]; + int i = 0, id = 0; for (Column column : partitionMethod.getExpressionSchema().getColumns()) { - int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName()); - shuffleKeys[i++] = insertNode.getProjectedSchema().getColumn(id); + if (node.getType() == NodeType.INSERT) { + id = tableSchema.getColumnId(column.getQualifiedName()); + } else { + id = tableSchema.getColumns().size() + i; + } + shuffleKeys[i++] = projectedSchema.getColumn(id); } channel.setShuffleKeys(shuffleKeys); channel.setShuffleType(SCATTERED_HASH_SHUFFLE); http://git-wip-us.apache.org/repos/asf/tajo/blob/b50831ff/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index bb15bfc..0d98b91 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -46,54 +46,68 @@ import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.worker.TajoWorker; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.sql.ResultSet; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE; import static org.junit.Assert.*; +@RunWith(Parameterized.class) public class TestTablePartitions extends QueryTestCaseBase { + private NodeType nodeType; - public TestTablePartitions() throws IOException { + public TestTablePartitions(NodeType nodeType) throws IOException { super(TajoConstants.DEFAULT_DATABASE_NAME); + this.nodeType = nodeType; + } + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + //type + {NodeType.INSERT}, + {NodeType.CREATE_TABLE}, + }); } @Test public final void testCreateColumnPartitionedTable() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTable"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); - assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); - res = testBase.execute( + res = testBase.execute( "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + - "l_quantity from lineitem"); + "l_quantity from lineitem"); + } else { + res = testBase.execute( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from lineitem"); + } MasterPlan plan = getQueryPlan(res); ExecutionBlock rootEB = plan.getRoot(); - /* - ------------------------------------------------------------------------------- - |-eb_1405354886454_0001_000003 - |-eb_1405354886454_0001_000002 - |-eb_1405354886454_0001_000001 - */ assertEquals(1, plan.getChildCount(rootEB.getId())); ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0); assertNotNull(insertEB); - assertEquals(NodeType.INSERT, insertEB.getPlan().getType()); + + assertEquals(nodeType, insertEB.getPlan().getType()); assertEquals(1, plan.getChildCount(insertEB.getId())); ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0); @@ -105,40 +119,41 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); assertEquals(1, channel.getShuffleKeys().length); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @Test public final void testCreateColumnPartitionedTableWithJoin() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); - assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); - res = testBase.execute( + res = testBase.execute( "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " + - "l_quantity from lineitem join orders on l_orderkey = o_orderkey"); + "l_quantity from lineitem join orders on l_orderkey = o_orderkey"); + + } else { + res = testBase.execute("create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8) " + + " AS select l_orderkey, l_partkey, l_quantity from lineitem join orders on l_orderkey = o_orderkey"); + } MasterPlan plan = getQueryPlan(res); ExecutionBlock rootEB = plan.getRoot(); - /* - ------------------------------------------------------------------------------- - |-eb_1405356074433_0001_000005 - |-eb_1405356074433_0001_000004 - |-eb_1405356074433_0001_000003 - |-eb_1405356074433_0001_000002 - |-eb_1405356074433_0001_000001 - */ assertEquals(1, plan.getChildCount(rootEB.getId())); ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0); assertNotNull(insertEB); - assertEquals(NodeType.INSERT, insertEB.getPlan().getType()); + assertEquals(nodeType, insertEB.getPlan().getType()); assertEquals(1, plan.getChildCount(insertEB.getId())); ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0); @@ -150,38 +165,56 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); assertEquals(1, channel.getShuffleKeys().length); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @Test public final void testCreateColumnPartitionedTableWithSelectedColumns() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithSelectedColumns"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) "); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); - assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); - res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, " + + res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, " + "l_partkey, l_quantity from lineitem"); + } else { + res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col int4)" + + " partition by column(key float8) AS select l_orderkey, l_partkey, null, l_quantity from lineitem"); + } res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); } @Test public final void testColumnPartitionedTableByOneColumn() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumn"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) "); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString("insert overwrite into " + tableName + res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem"); + } else { + res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col int4) " + + " partition by column(key float8) as select l_orderkey, l_partkey, null, l_quantity from lineitem"); + } res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); assertPartitionDirectories(desc); @@ -197,6 +230,7 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(1)); assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(2)); } + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -216,16 +250,23 @@ public class TestTablePartitions extends QueryTestCaseBase { @Test public final void testQueryCasesOnColumnPartitionedTable() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testQueryCasesOnColumnPartitionedTable"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) "); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString( + res = executeString( "insert overwrite into " + tableName - + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem"); + + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem"); + } else { + res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col int4) " + + " partition by column(key float8) as select l_orderkey, l_partkey, null, l_quantity from lineitem"); + } res.close(); TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); @@ -292,20 +333,31 @@ public class TestTablePartitions extends QueryTestCaseBase { res = executeFile("case13.sql"); assertResultSet(res, "case13.result"); res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); } @Test public final void testColumnPartitionedTableByThreeColumns() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumns"); - ResultSet res = testBase.execute( + + if (nodeType == NodeType.INSERT) { + res = testBase.execute( "create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) "); - res.close(); - TajoTestingCluster cluster = testBase.getTestingCluster(); - CatalogService catalog = cluster.getMaster().getCatalog(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + res.close(); + TajoTestingCluster cluster = testBase.getTestingCluster(); + CatalogService catalog = cluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString("insert overwrite into " + tableName + res = executeString("insert overwrite into " + tableName + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + } else { + res = executeString( "create table " + tableName + " (col4 text) " + + " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag, l_orderkey, l_partkey, " + + "l_quantity from lineitem"); + } res.close(); TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); @@ -342,7 +394,6 @@ public class TestTablePartitions extends QueryTestCaseBase { } res.close(); - Map<Double, int []> resultRows2 = Maps.newHashMap(); resultRows2.put(49.0d, new int[]{3, 3}); resultRows2.put(45.0d, new int[]{3, 2}); @@ -355,21 +406,31 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2)); assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3)); } + + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @Test public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoColumnPartitionedTableByThreeColumns"); - ResultSet res = testBase.execute( + + if (nodeType == NodeType.INSERT) { + res = testBase.execute( "create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) "); - res.close(); - TajoTestingCluster cluster = testBase.getTestingCluster(); - CatalogService catalog = cluster.getMaster().getCatalog(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + res.close(); + TajoTestingCluster cluster = testBase.getTestingCluster(); + CatalogService catalog = cluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString("insert into " + tableName + res = executeString("insert into " + tableName + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + } else { + res = executeString( "create table " + tableName + " (col4 text) " + + " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag, l_orderkey, l_partkey, " + + "l_quantity from lineitem"); + } res.close(); TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); @@ -405,7 +466,6 @@ public class TestTablePartitions extends QueryTestCaseBase { } res.close(); - Map<Double, int []> resultRows2 = Maps.newHashMap(); resultRows2.put(49.0d, new int[]{3, 3}); resultRows2.put(45.0d, new int[]{3, 2}); @@ -534,21 +594,33 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(summary.getDirectoryCount(), 1L); assertEquals(summary.getFileCount(), 0L); assertEquals(summary.getLength(), 0L); + + executeString("DROP TABLE " + tableName + " PURGE").close(); } @Test public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumnsWithCompression"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col2 int4, col3 float8) USING csv " + - "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + - "PARTITION BY column(col1 int4)"); - res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "PARTITION BY column(col1 int4)"); + res.close(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString( + res = executeString( "insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey from lineitem"); + } else { + res = executeString( + "create table " + tableName + " (col2 int4, col3 float8) USING csv " + + "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "PARTITION BY column(col1 int4) as select l_partkey, l_quantity, l_orderkey from lineitem"); + } res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); if (!testingCluster.isHiveCatalogStoreRunning()) { assertEquals(5, desc.getStats().getNumRows().intValue()); @@ -570,22 +642,34 @@ public class TestTablePartitions extends QueryTestCaseBase { assertTrue(codec instanceof DeflateCodec); } } + + executeString("DROP TABLE " + tableName + " PURGE").close(); } @Test public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByTwoColumnsWithCompression"); - ResultSet res = executeString("create table " + tableName + " (col3 float8, col4 text) USING csv " + + + if (nodeType == NodeType.INSERT) { + res = executeString("create table " + tableName + " (col3 float8, col4 text) USING csv " + "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + "PARTITION by column(col1 int4, col2 int4)"); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString( + res = executeString( "insert overwrite into " + tableName + - " select l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem"); + " select l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem"); + } else { + res = executeString("create table " + tableName + " (col3 float8, col4 text) USING csv " + + "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "PARTITION by column(col1 int4, col2 int4) as select l_quantity, l_returnflag, l_orderkey, " + + "l_partkey from lineitem"); + } res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); if (!testingCluster.isHiveCatalogStoreRunning()) { assertEquals(5, desc.getStats().getNumRows().intValue()); @@ -614,23 +698,35 @@ public class TestTablePartitions extends QueryTestCaseBase { } } } + + executeString("DROP TABLE " + tableName + " PURGE").close(); } @Test public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumnsWithCompression"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col4 text) USING csv " + - "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + - "partition by column(col1 int4, col2 int4, col3 float8)"); - res.close(); + "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "partition by column(col1 int4, col2 int4, col3 float8)"); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString( + res = executeString( "insert overwrite into " + tableName + - " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + } else { + res = executeString("create table " + tableName + " (col4 text) USING csv " + + "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag, l_orderkey, l_partkey, " + + "l_quantity from lineitem"); + } res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); if (!testingCluster.isHiveCatalogStoreRunning()) { assertEquals(5, desc.getStats().getNumRows().intValue()); @@ -697,23 +793,35 @@ public class TestTablePartitions extends QueryTestCaseBase { res.close(); assertEquals(3, i); + + executeString("DROP TABLE " + tableName + " PURGE").close(); } @Test public final void testColumnPartitionedTableNoMatchedPartition() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableNoMatchedPartition"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col4 text) USING csv " + - "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + - "partition by column(col1 int4, col2 int4, col3 float8)"); - res.close(); + "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "partition by column(col1 int4, col2 int4, col3 float8)"); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString( + res = executeString( "insert overwrite into " + tableName + - " select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem"); + " select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem"); + } else { + res = executeString("create table " + tableName + " (col4 text) USING csv " + + "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag , l_orderkey, l_partkey, " + + "l_quantity from lineitem"); + } res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); if (!testingCluster.isHiveCatalogStoreRunning()) { assertEquals(5, desc.getStats().getNumRows().intValue()); @@ -753,12 +861,15 @@ public class TestTablePartitions extends QueryTestCaseBase { res = executeString("select * from " + tableName + " where col2 = 9"); assertFalse(res.next()); res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); } @Test public final void testColumnPartitionedTableWithSmallerExpressions1() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions1"); - ResultSet res = executeString( + res = executeString( "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) "); res.close(); @@ -773,26 +884,35 @@ public class TestTablePartitions extends QueryTestCaseBase { res = executeFile("case14.sql"); assertResultSet(res, "case14.result"); res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); } @Test public final void testColumnPartitionedTableWithSmallerExpressions2() throws Exception { + ResultSet res = null; + ClientProtos.SubmitQueryResponse response = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions2"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by column(key float8) "); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - ClientProtos.SubmitQueryResponse response = client.executeQuery("insert overwrite into " + tableName + response = client.executeQuery("insert overwrite into " + tableName + " select l_returnflag , l_orderkey, l_partkey from lineitem"); - assertTrue(response.hasErrorMessage()); - assertEquals(response.getErrorMessage(), "INSERT has smaller expressions than target columns\n"); + assertTrue(response.hasErrorMessage()); + assertEquals(response.getErrorMessage(), "INSERT has smaller expressions than target columns\n"); - res = executeFile("case15.sql"); - assertResultSet(res, "case15.result"); - res.close(); + res = executeFile("case15.sql"); + assertResultSet(res, "case15.result"); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + } } @@ -803,68 +923,104 @@ public class TestTablePartitions extends QueryTestCaseBase { res = executeString("create database testinsertquery2;"); res.close(); - res = executeString("create table testinsertquery1.table1 " + + if (nodeType == NodeType.INSERT) { + res = executeString("create table testinsertquery1.table1 " + "(col1 int4, col2 int4, col3 float8)"); - res.close(); + res.close(); - res = executeString("create table testinsertquery2.table1 " + + res = executeString("create table testinsertquery2.table1 " + "(col1 int4, col2 int4, col3 float8)"); - res.close(); + res.close(); - CatalogService catalog = testingCluster.getMaster().getCatalog(); - assertTrue(catalog.existsTable("testinsertquery1", "table1")); - assertTrue(catalog.existsTable("testinsertquery2", "table1")); + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable("testinsertquery1", "table1")); + assertTrue(catalog.existsTable("testinsertquery2", "table1")); - res = executeString("insert overwrite into testinsertquery1.table1 " + + res = executeString("insert overwrite into testinsertquery1.table1 " + "select l_orderkey, l_partkey, l_quantity from default.lineitem;"); - res.close(); + res.close(); + } else { + res = executeString("create table testinsertquery1.table1 " + + "(col1 int4, col2 int4, col3 float8) as select l_orderkey, l_partkey, l_quantity from default.lineitem;"); + res.close(); + } TableDesc desc = catalog.getTableDesc("testinsertquery1", "table1"); if (!testingCluster.isHiveCatalogStoreRunning()) { assertEquals(5, desc.getStats().getNumRows().intValue()); } - res = executeString("insert overwrite into testinsertquery2.table1 " + + if (nodeType == NodeType.INSERT) { + res = executeString("insert overwrite into testinsertquery2.table1 " + "select col1, col2, col3 from testinsertquery1.table1;"); - res.close(); - + res.close(); + } else { + res = executeString("create table testinsertquery2.table1 " + + "(col1 int4, col2 int4, col3 float8) as select col1, col2, col3 from testinsertquery1.table1;"); + res.close(); + } desc = catalog.getTableDesc("testinsertquery2", "table1"); if (!testingCluster.isHiveCatalogStoreRunning()) { assertEquals(5, desc.getStats().getNumRows().intValue()); } + + executeString("DROP TABLE testinsertquery1.table1 PURGE").close(); + executeString("DROP TABLE testinsertquery2.table1 PURGE").close(); + executeString("DROP DATABASE testinsertquery1").close(); + executeString("DROP DATABASE testinsertquery2").close(); } @Test public final void testColumnPartitionedTableWithSmallerExpressions5() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions5"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col1 text) partition by column(col2 text) "); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem"); + res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem"); + + } else { + res = executeString("create table " + tableName + " (col1 text) partition by column(col2 text) " + + " as select l_returnflag, null from lineitem"); + } res.close(); res = executeString("select * from " + tableName); assertResultSet(res); res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); } @Test public final void testColumnPartitionedTableWithSmallerExpressions6() throws Exception { + ResultSet res = null; String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions6"); - ResultSet res = executeString( + + if (nodeType == NodeType.INSERT) { + res = executeString( "create table " + tableName + " (col1 text) partition by column(col2 text) "); - res.close(); + res.close(); - assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); - res = executeString( + res = executeString( "insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem where l_orderkey = 1"); + } else { + res = executeString( "create table " + tableName + " (col1 text) partition by column(col2 text) " + + " as select l_returnflag, null from lineitem where l_orderkey = 1"); + } res.close(); + res = executeString("select * from " + tableName); assertResultSet(res); res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); } private MasterPlan getQueryPlan(ResultSet res) { @@ -924,8 +1080,13 @@ public class TestTablePartitions extends QueryTestCaseBase { CatalogService catalog = testingCluster.getMaster().getCatalog(); assertTrue(catalog.existsTable("default", "testscatteredhashshuffle")); - executeString("create table test_partition (col2 text) partition by column (col1 text)").close(); - executeString("insert into test_partition select col2, col1 from testscatteredhashshuffle").close(); + if (nodeType == NodeType.INSERT) { + executeString("create table test_partition (col2 text) partition by column (col1 text)").close(); + executeString("insert into test_partition select col2, col1 from testscatteredhashshuffle").close(); + } else { + executeString("create table test_partition (col2 text) PARTITION BY COLUMN (col1 text) AS select col2, " + + "col1 from testscatteredhashshuffle").close(); + } ResultSet res = executeString("select col1 from test_partition"); @@ -935,8 +1096,6 @@ public class TestTablePartitions extends QueryTestCaseBase { } assertEquals(data.size(), numRows); - // assert data file size - } finally { testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname, TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal); @@ -953,10 +1112,17 @@ public class TestTablePartitions extends QueryTestCaseBase { executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl"); - executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY COLUMN (type text)") + if (nodeType == NodeType.INSERT) { + executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY COLUMN (type text)") .close(); - executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") + executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") .close(); + } else { + executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY COLUMN (type text)" + + " AS SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") + .close(); + } + ResultSet res = executeString("select * from pTable947 where type='RA:*?><I/L#%S' or type='AIR'"); String resStr = resultSetToString(res); @@ -968,6 +1134,8 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(expected, resStr); cleanupQuery(res); + + executeString("DROP TABLE pTable947 PURGE").close(); } @Test @@ -976,10 +1144,16 @@ public class TestTablePartitions extends QueryTestCaseBase { executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl"); - executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY COLUMN (type text)") + if (nodeType == NodeType.INSERT) { + executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY COLUMN (type text)") .close(); - executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") + executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") .close(); + } else { + executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY COLUMN (type text)" + + " AS SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") + .close(); + } ResultSet res = executeString("select * from pTable948 where type='RA:*?><I/L#%S'"); assertResultSet(res); @@ -988,6 +1162,8 @@ public class TestTablePartitions extends QueryTestCaseBase { res = executeString("select * from pTable948 where type='RA:*?><I/L#%S' or type='AIR01'"); assertResultSet(res); cleanupQuery(res); + + executeString("DROP TABLE pTable948 PURGE").close(); } @Test @@ -995,19 +1171,22 @@ public class TestTablePartitions extends QueryTestCaseBase { // See - TAJO-1219: Files located in intermediate directories of partitioned table should be ignored // It verifies that Tajo ignores files located in intermediate directories of partitioned table. - Path testDir = CommonTestingUtil.getTestDir(); + if (nodeType == NodeType.INSERT) { + Path testDir = CommonTestingUtil.getTestDir(); - executeString( - "CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION BY COLUMN (col2 text) " + - "LOCATION '" + testDir + "'"); + executeString( + "CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION BY COLUMN (col2 text) " + + "LOCATION '" + testDir + "'"); - FileSystem fs = testDir.getFileSystem(conf); - FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data")); - fos.write("a|b|c".getBytes()); - fos.close(); + FileSystem fs = testDir.getFileSystem(conf); + FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data")); + fos.write("a|b|c".getBytes()); + fos.close(); - ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;"); - assertFalse(res.next()); - res.close(); + ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;"); + assertFalse(res.next()); + res.close(); + } } + }
