Repository: tajo Updated Branches: refs/heads/index_support 60cbe9cac -> d09bd8ddc
TAJO-1368: Exceptions during processing nested union queries Closes #402 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/725448c5 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/725448c5 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/725448c5 Branch: refs/heads/index_support Commit: 725448c5249cd8691ea167f595237ed7bcc22293 Parents: a9ae3ca Author: Jihun Kang <[email protected]> Authored: Thu Mar 19 09:24:55 2015 +0900 Committer: Jihun Kang <[email protected]> Committed: Thu Mar 19 09:24:55 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../engine/planner/global/GlobalPlanner.java | 39 +++++++++--- .../java/org/apache/tajo/querymaster/Stage.java | 3 +- .../apache/tajo/engine/query/TestCTASQuery.java | 28 +++++++++ .../tajo/engine/query/TestUnionQuery.java | 64 +++++++++++++++++++- .../TestCTASQuery/CtasWithMultipleUnions.sql | 12 ++++ .../testCtasWithMultipleUnions.sql | 1 + 7 files changed, 140 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4875cab..56d77b3 100644 --- a/CHANGES +++ b/CHANGES @@ -32,6 +32,9 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1368: Exceptions during processing nested union queries. + (jihun) + TAJO-1405: Fix some illegal way of usages on connection pool. (Contributed by navis, Committed by Keuntae Park) http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 15d8034..d2ac6cc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -821,7 +822,7 @@ public class GlobalPlanner { public static boolean hasUnionChild(UnaryNode node) { - // there are two cases: + // there are three cases: // // The first case is: // @@ -835,9 +836,15 @@ public class GlobalPlanner { // select avg(..) from (select ... UNION select ) T // // We can generalize this case as 'a shuffle required operator on the top of union'. + // + // The third case is: + // + // create table select * from ( select ... ) a union all select * from ( select ... ) b - if (node.getChild() instanceof UnaryNode) { // first case - UnaryNode child = node.getChild(); + LogicalNode childNode = node.getChild(); + + if (childNode instanceof UnaryNode) { // first case + UnaryNode child = (UnaryNode) childNode; if (child.getChild().getType() == NodeType.PROJECTION) { child = child.getChild(); @@ -848,9 +855,11 @@ public class GlobalPlanner { return tableSubQuery.getSubQuery().getType() == NodeType.UNION; } - } else if (node.getChild().getType() == NodeType.TABLE_SUBQUERY) { // second case + } else if (childNode.getType() == NodeType.TABLE_SUBQUERY) { // second case TableSubQueryNode tableSubQuery = node.getChild(); return tableSubQuery.getSubQuery().getType() == NodeType.UNION; + } else if (childNode.getType() == NodeType.UNION) { // third case + return true; } return false; @@ -1156,6 +1165,9 @@ public class GlobalPlanner { ((TableSubQueryNode)child).getSubQuery().getType() == NodeType.UNION) { MasterPlan masterPlan = context.plan; for (DataChannel dataChannel : masterPlan.getIncomingChannels(execBlock.getId())) { + // This data channel will be stored in staging directory, but RawFile, default file type, does not support + // distributed file system. It needs to change the file format for distributed file system. + dataChannel.setStoreType(CatalogProtos.StoreType.CSV); ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId()); ProjectionNode copy = PlannerUtil.clone(plan, node); @@ -1371,18 +1383,28 @@ public class GlobalPlanner { LogicalPlan.QueryBlock rightQueryBlock = plan.getBlock(node.getRightChild()); LogicalNode rightChild = visit(context, plan, rightQueryBlock, rightQueryBlock.getRoot(), stack); stack.pop(); + + MasterPlan masterPlan = context.getPlan(); List<ExecutionBlock> unionBlocks = Lists.newArrayList(); List<ExecutionBlock> queryBlockBlocks = Lists.newArrayList(); ExecutionBlock leftBlock = context.execBlockMap.remove(leftChild.getPID()); ExecutionBlock rightBlock = context.execBlockMap.remove(rightChild.getPID()); - if (leftChild.getType() == NodeType.UNION) { + + // These union types need to eliminate unnecessary nodes between parent and child node of query tree. + boolean leftUnion = (leftChild.getType() == NodeType.UNION) || + ((leftChild.getType() == NodeType.TABLE_SUBQUERY) && + (((TableSubQueryNode)leftChild).getSubQuery().getType() == NodeType.UNION)); + boolean rightUnion = (rightChild.getType() == NodeType.UNION) || + (rightChild.getType() == NodeType.TABLE_SUBQUERY) && + (((TableSubQueryNode)rightChild).getSubQuery().getType() == NodeType.UNION); + if (leftUnion) { unionBlocks.add(leftBlock); } else { queryBlockBlocks.add(leftBlock); } - if (rightChild.getType() == NodeType.UNION) { + if (rightUnion) { unionBlocks.add(rightBlock); } else { queryBlockBlocks.add(rightBlock); @@ -1396,7 +1418,8 @@ public class GlobalPlanner { } for (ExecutionBlock childBlocks : unionBlocks) { - for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlocks)) { + for (ExecutionBlock grandChildBlock : masterPlan.getChilds(childBlocks)) { + masterPlan.disconnect(grandChildBlock, childBlocks); queryBlockBlocks.add(grandChildBlock); } } @@ -1404,7 +1427,7 @@ public class GlobalPlanner { for (ExecutionBlock childBlocks : queryBlockBlocks) { DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_SHUFFLE, 1); channel.setStoreType(storeType); - context.plan.addConnect(channel); + masterPlan.addConnect(channel); } context.execBlockMap.put(node.getPID(), execBlock); http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 4e1f716..20add9f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -780,7 +780,8 @@ public class Stage implements EventHandler<StageEvent> { try { // Union operator does not require actual query processing. It is performed logically. if (execBlock.hasUnion()) { - stage.finalizeStats(); + // Though union operator does not be processed at all, but it should handle the completion event. + stage.complete(); state = StageState.SUCCEEDED; } else { ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java index e93d214..18c9fbc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java @@ -182,6 +182,34 @@ public class TestCTASQuery extends QueryTestCaseBase { } @Test + public final void testCtasWithMultipleUnions() throws Exception { + ResultSet res = executeFile("CtasWithMultipleUnions.sql"); + res.close(); + + ResultSet res2 = executeQuery(); + String actual = resultSetToString(res2); + res2.close(); + + String expected = "c_custkey,c_nationkey\n" + + "-------------------------------\n" + + "1,15\n" + + "2,13\n" + + "3,1\n" + + "4,4\n" + + "5,3\n" + + "1,15\n" + + "2,13\n" + + "3,1\n" + + "4,4\n" + + "5,3\n"; + + assertEquals(expected, actual); + + TableDesc desc = client.getTableDesc(CatalogUtil.normalizeIdentifier(res2.getMetaData().getTableName(1))); + assertNotNull(desc); + } + + @Test public final void testCtasWithStoreType() throws Exception { ResultSet res = executeFile("CtasWithStoreType.sql"); res.close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java index d46d110..03a80d1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java @@ -450,4 +450,66 @@ public class TestUnionQuery extends QueryTestCaseBase { assertEquals(expected, resultSetToString(res)); res.close(); } -} \ No newline at end of file + + @Test + public void testTajo1368Case1() throws Exception { + ResultSet res = executeString( + "select * from " + + " (select c_custkey, c_nationkey from customer where c_nationkey < 0 " + + " union all " + + " select c_custkey, c_nationkey from customer where c_nationkey > 0 " + + ") a " + + "union all " + + "select * from " + + " (select c_custkey, c_nationkey from customer where c_nationkey < 0 " + + " union all " + + " select c_custkey, c_nationkey from customer where c_nationkey > 0 " + + ") b "); + + String expected = "c_custkey,c_nationkey\n" + + "-------------------------------\n" + + "1,15\n" + + "2,13\n" + + "3,1\n" + + "4,4\n" + + "5,3\n" + + "1,15\n" + + "2,13\n" + + "3,1\n" + + "4,4\n" + + "5,3\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + } + + @Test + public void testTajo1368Case2() throws Exception { + ResultSet res = executeString("select * from ( "+ + "select c_custkey, c_nationkey from ( " + + "select c_custkey, c_nationkey from ( " + + "select c_custkey, c_nationkey from customer) a " + + "union all " + + "select c_custkey, c_nationkey from ( " + + "select c_custkey, c_nationkey from customer) a " + + " ) a " + + " ) a "); + + String expected = "c_custkey,c_nationkey\n" + + "-------------------------------\n" + + "1,15\n" + + "2,13\n" + + "3,1\n" + + "4,4\n" + + "5,3\n" + + "1,15\n" + + "2,13\n" + + "3,1\n" + + "4,4\n" + + "5,3\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql new file mode 100644 index 0000000..7176a2a --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithMultipleUnions.sql @@ -0,0 +1,12 @@ +create table testCtasWithMultipleUnions as +select * from + (select c_custkey, c_nationkey from customer where c_nationkey < 0 + union all + select c_custkey, c_nationkey from customer where c_nationkey > 0 +) a +union all +select * from + (select c_custkey, c_nationkey from customer where c_nationkey < 0 + union all + select c_custkey, c_nationkey from customer where c_nationkey > 0 +) b; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/725448c5/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql b/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql new file mode 100644 index 0000000..71b7034 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestCTASQuery/testCtasWithMultipleUnions.sql @@ -0,0 +1 @@ +select * from testCtasWithMultipleUnions; \ No newline at end of file
