TAJO-652: logical planner cannot handle alias on partition columns. (Hyoungjun Kim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cbe1d6e9 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cbe1d6e9 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cbe1d6e9 Branch: refs/heads/window_function Commit: cbe1d6e94ed7e3bdcd30d0414815736621b6b8df Parents: bbf2461 Author: Hyunsik Choi <[email protected]> Authored: Mon Apr 21 18:24:41 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Apr 21 18:26:11 2014 +0900 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../apache/tajo/engine/planner/LogicalPlan.java | 230 ++++++++++--------- .../tajo/engine/query/TestTablePartitions.java | 15 ++ .../queries/TestTablePartitions/case11.sql | 1 + .../queries/TestTablePartitions/case12.sql | 7 + .../queries/TestTablePartitions/case13.sql | 11 + .../results/TestTablePartitions/case11.result | 7 + .../results/TestTablePartitions/case12.result | 7 + .../results/TestTablePartitions/case13.result | 7 + .../org/apache/tajo/rpc/NettyServerBase.java | 3 + 10 files changed, 184 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7f1b175..eaf81a5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -336,6 +336,9 @@ Release 0.8.0 - unreleased BUG FIXES + TAJO-652: logical planner cannot handle alias on partition columns. + (Hyoungjun Kim via hyunsik) + TAJO-675: maximum frame size of frameDecoder should be increased. (jinho) TAJO-748: Shuffle output numbers of join may be inconsistent. (jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java index 98fbf42..6be0c6a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java @@ -286,105 +286,86 @@ public class LogicalPlan { * It resolves a column. */ public Column resolveColumn(QueryBlock block, ColumnReferenceExpr columnRef) throws PlanningException { + if (columnRef.hasQualifier()) { + return resolveColumnWithQualifier(block, columnRef); + } else { + return resolveColumnWithoutQualifier(block, columnRef); + } + } - if (columnRef.hasQualifier()) { // if a column reference is qualified - - String qualifier; - String canonicalName; - String qualifiedName; + private Column resolveColumnWithQualifier(QueryBlock block, ColumnReferenceExpr columnRef) throws PlanningException { + String qualifier; + String canonicalName; + String qualifiedName; - if (CatalogUtil.isFQTableName(columnRef.getQualifier())) { - qualifier = columnRef.getQualifier(); - canonicalName = columnRef.getCanonicalName(); - } else { - String resolvedDatabaseName = resolveDatabase(block, columnRef.getQualifier()); - if (resolvedDatabaseName == null) { - throw new NoSuchColumnException(columnRef.getQualifier()); - } - qualifier = CatalogUtil.buildFQName(resolvedDatabaseName, columnRef.getQualifier()); - canonicalName = CatalogUtil.buildFQName(qualifier, columnRef.getName()); + if (CatalogUtil.isFQTableName(columnRef.getQualifier())) { + qualifier = columnRef.getQualifier(); + canonicalName = columnRef.getCanonicalName(); + } else { + String resolvedDatabaseName = resolveDatabase(block, columnRef.getQualifier()); + if (resolvedDatabaseName == null) { + throw new NoSuchColumnException(columnRef.getQualifier()); } - qualifiedName = CatalogUtil.buildFQName(qualifier, columnRef.getName()); + qualifier = CatalogUtil.buildFQName(resolvedDatabaseName, columnRef.getQualifier()); + canonicalName = CatalogUtil.buildFQName(qualifier, columnRef.getName()); + } + qualifiedName = CatalogUtil.buildFQName(qualifier, columnRef.getName()); - RelationNode relationOp = block.getRelation(qualifier); + RelationNode relationOp = block.getRelation(qualifier); - // if a column name is outside of this query block - if (relationOp == null) { - // TODO - nested query can only refer outer query block? or not? - for (QueryBlock eachBlock : queryBlocks.values()) { - if (eachBlock.existsRelation(qualifier)) { - relationOp = eachBlock.getRelation(qualifier); - } + // if a column name is outside of this query block + if (relationOp == null) { + // TODO - nested query can only refer outer query block? or not? + for (QueryBlock eachBlock : queryBlocks.values()) { + if (eachBlock.existsRelation(qualifier)) { + relationOp = eachBlock.getRelation(qualifier); } } + } - // If we cannot find any relation against a qualified column name - if (relationOp == null) { - throw new NoSuchColumnException(canonicalName); - } - - if (block.isAlreadyRenamedTableName(CatalogUtil.extractQualifier(canonicalName))) { - String changedName = CatalogUtil.buildFQName( - relationOp.getCanonicalName(), - CatalogUtil.extractSimpleName(canonicalName)); - canonicalName = changedName; - } - - Schema schema = relationOp.getTableSchema(); - Column column = schema.getColumn(canonicalName); - if (column == null) { - throw new NoSuchColumnException(canonicalName); - } + // If we cannot find any relation against a qualified column name + if (relationOp == null) { + throw new NoSuchColumnException(canonicalName); + } - // If code reach here, a column is found. - // But, it may be aliased from bottom logical node. - // If the column is aliased, the found name may not be used in upper node. - - // Here, we try to check if column reference is already aliased. - // If so, it replaces the name with aliased name. - LogicalNode currentNode = block.getCurrentNode(); - - // The condition (currentNode.getInSchema().contains(column)) means - // the column can be used at the current node. So, we don't need to find aliase name. - if (currentNode != null && !currentNode.getInSchema().contains(column) - && currentNode.getType() != NodeType.TABLE_SUBQUERY) { - List<Column> candidates = TUtil.newList(); - if (block.namedExprsMgr.isAliased(qualifiedName)) { - String alias = block.namedExprsMgr.getAlias(canonicalName); - Column found = resolveColumn(block, new ColumnReferenceExpr(alias)); - if (found != null) { - candidates.add(found); - } - } - if (!candidates.isEmpty()) { - return ensureUniqueColumn(candidates); - } - } + if (block.isAlreadyRenamedTableName(CatalogUtil.extractQualifier(canonicalName))) { + String changedName = CatalogUtil.buildFQName( + relationOp.getCanonicalName(), + CatalogUtil.extractSimpleName(canonicalName)); + canonicalName = changedName; + } - return column; - } else { // if a column reference is not qualified + Schema schema = relationOp.getTableSchema(); + Column column = schema.getColumn(canonicalName); + if (column == null) { + throw new NoSuchColumnException(canonicalName); + } - // Trying to find the column within the current block + // If code reach here, a column is found. + // But, it may be aliased from bottom logical node. + // If the column is aliased, the found name may not be used in upper node. - if (block.currentNode != null && block.currentNode.getInSchema() != null) { - Column found = block.currentNode.getInSchema().getColumn(columnRef.getCanonicalName()); - if (found != null) { - return found; - } - } + // Here, we try to check if column reference is already aliased. + // If so, it replaces the name with aliased name. + LogicalNode currentNode = block.getCurrentNode(); - if (block.getLatestNode() != null) { - Column found = block.getLatestNode().getOutSchema().getColumn(columnRef.getName()); - if (found != null) { - return found; - } + // The condition (currentNode.getInSchema().contains(column)) means + // the column can be used at the current node. So, we don't need to find aliase name. + Schema currentNodeSchema = null; + if (currentNode != null) { + if (currentNode instanceof RelationNode) { + currentNodeSchema = ((RelationNode) currentNode).getTableSchema(); + } else { + currentNodeSchema = currentNode.getInSchema(); } + } + if (currentNode != null && !currentNodeSchema.contains(column) + && currentNode.getType() != NodeType.TABLE_SUBQUERY) { List<Column> candidates = TUtil.newList(); - // Trying to find columns from aliased references. - if (block.namedExprsMgr.isAliased(columnRef.getCanonicalName())) { - String originalName = block.namedExprsMgr.getAlias(columnRef.getCanonicalName()); - Column found = resolveColumn(block, new ColumnReferenceExpr(originalName)); + if (block.namedExprsMgr.isAliased(qualifiedName)) { + String alias = block.namedExprsMgr.getAlias(canonicalName); + Column found = resolveColumn(block, new ColumnReferenceExpr(alias)); if (found != null) { candidates.add(found); } @@ -392,47 +373,80 @@ public class LogicalPlan { if (!candidates.isEmpty()) { return ensureUniqueColumn(candidates); } + } - // Trying to find columns from other relations in the current block - for (RelationNode rel : block.getRelations()) { - Column found = rel.getTableSchema().getColumn(columnRef.getName()); - if (found != null) { - candidates.add(found); - } + return column; + } + + private Column resolveColumnWithoutQualifier(QueryBlock block, + ColumnReferenceExpr columnRef)throws PlanningException { + // Trying to find the column within the current block + if (block.currentNode != null && block.currentNode.getInSchema() != null) { + Column found = block.currentNode.getInSchema().getColumn(columnRef.getCanonicalName()); + if (found != null) { + return found; } + } - if (!candidates.isEmpty()) { - return ensureUniqueColumn(candidates); + if (block.getLatestNode() != null) { + Column found = block.getLatestNode().getOutSchema().getColumn(columnRef.getName()); + if (found != null) { + return found; } + } - // Trying to find columns from other relations in other blocks - for (QueryBlock eachBlock : queryBlocks.values()) { - for (RelationNode rel : eachBlock.getRelations()) { - Column found = rel.getTableSchema().getColumn(columnRef.getName()); - if (found != null) { - candidates.add(found); - } - } + List<Column> candidates = TUtil.newList(); + // Trying to find columns from aliased references. + if (block.namedExprsMgr.isAliased(columnRef.getCanonicalName())) { + String originalName = block.namedExprsMgr.getAlias(columnRef.getCanonicalName()); + Column found = resolveColumn(block, new ColumnReferenceExpr(originalName)); + if (found != null) { + candidates.add(found); } + } + if (!candidates.isEmpty()) { + return ensureUniqueColumn(candidates); + } - if (!candidates.isEmpty()) { - return ensureUniqueColumn(candidates); + // Trying to find columns from other relations in the current block + for (RelationNode rel : block.getRelations()) { + Column found = rel.getTableSchema().getColumn(columnRef.getName()); + if (found != null) { + candidates.add(found); } + } - // Trying to find columns from schema in current block. - if (block.getSchema() != null) { - Column found = block.getSchema().getColumn(columnRef.getName()); + if (!candidates.isEmpty()) { + return ensureUniqueColumn(candidates); + } + + // Trying to find columns from other relations in other blocks + for (QueryBlock eachBlock : queryBlocks.values()) { + for (RelationNode rel : eachBlock.getRelations()) { + Column found = rel.getTableSchema().getColumn(columnRef.getName()); if (found != null) { candidates.add(found); } } + } - if (!candidates.isEmpty()) { - return ensureUniqueColumn(candidates); + if (!candidates.isEmpty()) { + return ensureUniqueColumn(candidates); + } + + // Trying to find columns from schema in current block. + if (block.getSchema() != null) { + Column found = block.getSchema().getColumn(columnRef.getName()); + if (found != null) { + candidates.add(found); } + } - throw new VerifyException("ERROR: no such a column name "+ columnRef.getCanonicalName()); + if (!candidates.isEmpty()) { + return ensureUniqueColumn(candidates); } + + throw new VerifyException("ERROR: no such a column name "+ columnRef.getCanonicalName()); } private static Column ensureUniqueColumn(List<Column> candidates) @@ -676,6 +690,7 @@ public class LogicalPlan { } } + @SuppressWarnings("unchecked") public <T extends LogicalNode> T getCurrentNode() { return (T) this.currentNode; } @@ -722,6 +737,7 @@ public class LogicalPlan { queryBlockByPID.put(node.getPID(), this); } + @SuppressWarnings("unchecked") public <T extends LogicalNode> T getNode(NodeType nodeType) { return (T) nodeTypeToNodeMap.get(nodeType); } http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 9cc2410..0ec7de0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -187,6 +187,21 @@ public class TestTablePartitions extends QueryTestCaseBase { res = executeFile("case10.sql"); assertResultSet(res, "case10.result"); res.close(); + + // alias partition column + res = executeFile("case11.sql"); + assertResultSet(res, "case11.result"); + res.close(); + + // alias partition column in group by, order by + res = executeFile("case12.sql"); + assertResultSet(res, "case12.result"); + res.close(); + + // alias partition column in subquery + res = executeFile("case13.sql"); + assertResultSet(res, "case13.result"); + res.close(); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-core/src/test/resources/queries/TestTablePartitions/case11.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestTablePartitions/case11.sql b/tajo-core/src/test/resources/queries/TestTablePartitions/case11.sql new file mode 100644 index 0000000..a5bf1db --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestTablePartitions/case11.sql @@ -0,0 +1 @@ +select key as key_alias from testQueryCasesOnColumnPartitionedTable \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-core/src/test/resources/queries/TestTablePartitions/case12.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestTablePartitions/case12.sql b/tajo-core/src/test/resources/queries/TestTablePartitions/case12.sql new file mode 100644 index 0000000..2324439 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestTablePartitions/case12.sql @@ -0,0 +1,7 @@ +select + key as key_alias, + count(*) cnt +from + testQueryCasesOnColumnPartitionedTable +group by key_alias +order by key_alias desc \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-core/src/test/resources/queries/TestTablePartitions/case13.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestTablePartitions/case13.sql b/tajo-core/src/test/resources/queries/TestTablePartitions/case13.sql new file mode 100644 index 0000000..efb7432 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestTablePartitions/case13.sql @@ -0,0 +1,11 @@ +select + key_alias as key, cnt +from ( + select + key as key_alias, + count(*) cnt + from + testQueryCasesOnColumnPartitionedTable + group by key_alias + order by key_alias desc +) a \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-core/src/test/resources/results/TestTablePartitions/case11.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/case11.result b/tajo-core/src/test/resources/results/TestTablePartitions/case11.result new file mode 100644 index 0000000..95750fd --- /dev/null +++ b/tajo-core/src/test/resources/results/TestTablePartitions/case11.result @@ -0,0 +1,7 @@ +key_alias +------------------------------- +17.0 +36.0 +38.0 +45.0 +49.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-core/src/test/resources/results/TestTablePartitions/case12.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/case12.result b/tajo-core/src/test/resources/results/TestTablePartitions/case12.result new file mode 100644 index 0000000..ac0f9b7 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestTablePartitions/case12.result @@ -0,0 +1,7 @@ +key_alias,cnt +------------------------------- +49.0,1 +45.0,1 +38.0,1 +36.0,1 +17.0,1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-core/src/test/resources/results/TestTablePartitions/case13.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/case13.result b/tajo-core/src/test/resources/results/TestTablePartitions/case13.result new file mode 100644 index 0000000..5c92e79 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestTablePartitions/case13.result @@ -0,0 +1,7 @@ +key,cnt +------------------------------- +49.0,1 +45.0,1 +38.0,1 +36.0,1 +17.0,1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cbe1d6e9/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java index 8f98d3a..9ee098d 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -25,6 +25,7 @@ import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.DefaultChannelFuture; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.DefaultChannelGroup; @@ -67,6 +68,8 @@ public class NettyServerBase { public void init(ChannelPipelineFactory pipeline, int workerNum) { ChannelFactory factory = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); + DefaultChannelFuture.setUseDeadLockChecker(false); + pipelineFactory = pipeline; bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(pipelineFactory);
