This is an automated email from the ASF dual-hosted git repository. vitalii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 502d2977092eecda0a4aa0482b5f96459c315227 Author: Volodymyr Vysotskyi <[email protected]> AuthorDate: Fri May 18 15:54:16 2018 +0300 DRILL-5188: Expand sub-queries using rules - Add check for agg with group by literal - Allow NLJ for limit 1 - Implement single_value aggregate function closes #1321 --- exec/java-exec/src/main/codegen/config.fmpp | 1 + .../src/main/codegen/data/SingleValue.tdd | 62 +++++++++ .../src/main/codegen/templates/SingleValueAgg.java | 144 +++++++++++++++++++++ .../drill/exec/physical/impl/join/JoinUtils.java | 105 ++++++++++++++- .../apache/drill/exec/planner/PlannerPhase.java | 10 ++ .../apache/drill/exec/planner/RuleInstance.java | 10 ++ .../exec/planner/logical/PreProcessLogicalRel.java | 18 +-- .../drill/exec/planner/sql/SqlConverter.java | 8 +- .../planner/sql/handlers/DefaultSqlHandler.java | 12 +- .../java/org/apache/drill/TestCorrelation.java | 46 +++++++ .../apache/drill/TestDisabledFunctionality.java | 37 ------ .../java/org/apache/drill/TestExampleQueries.java | 33 +++++ .../java/org/apache/drill/TestTpchDistributed.java | 1 - .../apache/drill/TestTpchDistributedStreaming.java | 1 - .../java/org/apache/drill/TestTpchExplain.java | 1 - .../test/java/org/apache/drill/TestTpchLimit0.java | 1 - .../java/org/apache/drill/TestTpchPlanning.java | 1 - .../java/org/apache/drill/TestTpchSingleMode.java | 1 - .../drill/exec/fn/impl/TestAggregateFunctions.java | 80 ++++++++++++ pom.xml | 2 +- 20 files changed, 504 insertions(+), 70 deletions(-) diff --git a/exec/java-exec/src/main/codegen/config.fmpp b/exec/java-exec/src/main/codegen/config.fmpp index 50f110d..e233974 100644 --- a/exec/java-exec/src/main/codegen/config.fmpp +++ b/exec/java-exec/src/main/codegen/config.fmpp @@ -43,6 +43,7 @@ data: { intervalNumericTypes: tdd(../data/IntervalNumericTypes.tdd), extract: tdd(../data/ExtractTypes.tdd), sumzero: tdd(../data/SumZero.tdd), + singleValue: tdd(../data/SingleValue.tdd), numericTypes: tdd(../data/NumericTypes.tdd), casthigh: tdd(../data/CastHigh.tdd), countAggrTypes: tdd(../data/CountAggrTypes.tdd) diff --git a/exec/java-exec/src/main/codegen/data/SingleValue.tdd b/exec/java-exec/src/main/codegen/data/SingleValue.tdd new file mode 100644 index 0000000..a42fe3b --- /dev/null +++ b/exec/java-exec/src/main/codegen/data/SingleValue.tdd @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http:# www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ +types: [ + {inputType: "Bit", outputType: "NullableBit", runningType: "Bit", major: "primitive"}, + {inputType: "TinyInt", outputType: "NullableTinyInt", runningType: "TinyInt", major: "primitive"}, + {inputType: "NullableTinyInt", outputType: "NullableTinyInt", runningType: "TinyInt", major: "primitive"}, + {inputType: "UInt1", outputType: "NullableUInt1", runningType: "UInt1", major: "primitive"}, + {inputType: "NullableUInt1", outputType: "NullableUInt1", runningType: "UInt1", major: "primitive"}, + {inputType: "UInt2", outputType: "NullableUInt2", runningType: "UInt2", major: "primitive"}, + {inputType: "NullableUInt2", outputType: "NullableUInt2", runningType: "UInt2", major: "primitive"}, + {inputType: "SmallInt", outputType: "NullableSmallInt", runningType: "SmallInt", major: "primitive"}, + {inputType: "NullableSmallInt", outputType: "NullableSmallInt", runningType: "SmallInt", major: "primitive"}, + {inputType: "UInt4", outputType: "NullableUInt4", runningType: "UInt4", major: "primitive"}, + {inputType: "NullableUInt4", outputType: "NullableUInt4", runningType: "UInt4", major: "primitive"}, + {inputType: "UInt8", outputType: "NullableUInt8", runningType: "UInt8", major: "primitive"}, + {inputType: "NullableUInt8", outputType: "NullableUInt8", runningType: "UInt8", major: "primitive"}, + {inputType: "Int", outputType: "NullableInt", runningType: "Int", major: "primitive"}, + {inputType: "BigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "primitive"}, + {inputType: "NullableBit", outputType: "NullableBit", runningType: "Bit", major: "primitive"}, + {inputType: "NullableInt", outputType: "NullableInt", runningType: "Int", major: "primitive"}, + {inputType: "NullableBigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "primitive"}, + {inputType: "Float4", outputType: "NullableFloat4", runningType: "Float4", major: "primitive"}, + {inputType: "Float8", outputType: "NullableFloat8", runningType: "Float8", major: "primitive"}, + {inputType: "NullableFloat4", outputType: "NullableFloat4", runningType: "Float4", major: "primitive"}, + {inputType: "NullableFloat8", outputType: "NullableFloat8", runningType: "Float8", major: "primitive"}, + {inputType: "Date", outputType: "NullableDate", runningType: "Date", major: "primitive"}, + {inputType: "NullableDate", outputType: "NullableDate", runningType: "Date", major: "primitive"}, + {inputType: "TimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "primitive"}, + {inputType: "NullableTimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "primitive"}, + {inputType: "Time", outputType: "NullableTime", runningType: "Time", major: "primitive"}, + {inputType: "NullableTime", outputType: "NullableTime", runningType: "Time", major: "primitive"}, + {inputType: "IntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "IntervalDay"}, + {inputType: "NullableIntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "IntervalDay"}, + {inputType: "IntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "primitive"}, + {inputType: "NullableIntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "primitive"}, + {inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Interval"}, + {inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Interval"}, + {inputType: "VarDecimal", outputType: "NullableVarDecimal", runningType: "VarDecimal", major: "VarDecimal"}, + {inputType: "NullableVarDecimal", outputType: "NullableVarDecimal", runningType: "VarDecimal", major: "VarDecimal"}, + {inputType: "VarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "bytes"}, + {inputType: "NullableVarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "bytes"}, + {inputType: "Var16Char", outputType: "NullableVar16Char", runningType: "Var16Char", major: "bytes"}, + {inputType: "NullableVar16Char", outputType: "NullableVar16Char", runningType: "Var16Char", major: "bytes"}, + {inputType: "VarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "bytes"}, + {inputType: "NullableVarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "bytes"} + ] +} diff --git a/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java b/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java new file mode 100644 index 0000000..c0ff6cf --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +<@pp.dropOutputFile /> + +<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/gaggr/SingleValueFunctions.java" /> + +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.expr.fn.impl.gaggr; + +import org.apache.drill.exec.expr.DrillAggFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.annotations.Workspace; +import org.apache.drill.exec.expr.holders.*; + +import javax.inject.Inject; +import io.netty.buffer.DrillBuf; + +/* + * This class is generated using freemarker and the ${.template_name} template. + */ +@SuppressWarnings("unused") +public class SingleValueFunctions { +<#list singleValue.types as type> + + @FunctionTemplate(name = "single_value", + <#if type.major == "VarDecimal"> + returnType = FunctionTemplate.ReturnType.DECIMAL_AVG_AGGREGATE, + </#if> + scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE) + public static class ${type.inputType}SingleValue implements DrillAggFunc { + @Param ${type.inputType}Holder in; + @Workspace ${type.runningType}Holder value; + @Output ${type.outputType}Holder out; + @Workspace BigIntHolder nonNullCount; + <#if type.major == "VarDecimal" || type.major == "bytes"> + @Inject DrillBuf buffer; + </#if> + + public void setup() { + nonNullCount = new BigIntHolder(); + nonNullCount.value = 0; + value = new ${type.runningType}Holder(); + } + + @Override + public void add() { + <#if type.inputType?starts_with("Nullable")> + sout: { + if (in.isSet == 0) { + // processing nullable input and the value is null, so don't do anything... + break sout; + } + </#if> + if (nonNullCount.value == 0) { + nonNullCount.value = 1; + } else { + throw org.apache.drill.common.exceptions.UserException.functionError() + .message("Input for single_value function has more than one row") + .build(); + } + <#if type.major == "primitive"> + value.value = in.value; + <#elseif type.major == "IntervalDay"> + value.days = in.days; + value.milliseconds = in.milliseconds; + <#elseif type.major == "Interval"> + value.days = in.days; + value.milliseconds = in.milliseconds; + value.months = in.months; + <#elseif type.major == "VarDecimal"> + value.start = in.start; + value.end = in.end; + value.buffer = in.buffer; + value.scale = in.scale; + value.precision = in.precision; + <#elseif type.major == "bytes"> + value.start = in.start; + value.end = in.end; + value.buffer = in.buffer; + </#if> + <#if type.inputType?starts_with("Nullable")> + } // end of sout block + </#if> + } + + @Override + public void output() { + if (nonNullCount.value > 0) { + out.isSet = 1; + <#if type.major == "primitive"> + out.value = value.value; + <#elseif type.major == "IntervalDay"> + out.days = value.days; + out.milliseconds = value.milliseconds; + <#elseif type.major == "Interval"> + out.days = value.days; + out.milliseconds = value.milliseconds; + out.months = value.months; + <#elseif type.major == "VarDecimal"> + out.start = value.start; + out.end = value.end; + out.buffer = buffer.reallocIfNeeded(value.end - value.start); + out.buffer.writeBytes(value.buffer, value.start, value.end - value.start); + out.scale = value.scale; + out.precision = value.precision; + <#elseif type.major == "bytes"> + out.start = value.start; + out.end = value.end; + out.buffer = buffer.reallocIfNeeded(value.end - value.start); + out.buffer.writeBytes(value.buffer, value.start, value.end - value.start); + </#if> + } else { + out.isSet = 0; + } + } + + @Override + public void reset() { + value = new ${type.runningType}Holder(); + nonNullCount.value = 0; + } + } +</#list> +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java index e4dab91..b974537 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java @@ -17,9 +17,22 @@ */ package org.apache.drill.exec.physical.impl.join; +import org.apache.calcite.rel.RelShuttleImpl; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableFunctionScan; +import org.apache.calcite.rel.logical.LogicalCorrelate; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.calcite.rel.logical.LogicalIntersect; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalMinus; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.util.Util; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.calcite.rel.RelNode; @@ -35,9 +48,11 @@ import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.planner.logical.DrillLimitRel; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.resolver.TypeCastRules; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -220,7 +235,13 @@ public class JoinUtils { if (currentrel instanceof DrillAggregateRel) { agg = (DrillAggregateRel)currentrel; } else if (currentrel instanceof RelSubset) { - currentrel = ((RelSubset)currentrel).getBest() ; + currentrel = ((RelSubset) currentrel).getBest() ; + } else if (currentrel instanceof DrillLimitRel) { + // TODO: Improve this check when DRILL-5691 is fixed. + // The problem is that RelMdMaxRowCount currently cannot be used + // due to CALCITE-1048. + Integer fetchValue = ((RexLiteral) ((DrillLimitRel) currentrel).getFetch()).getValueAs(Integer.class); + return fetchValue != null && fetchValue <= 1; } else if (currentrel.getInputs().size() == 1) { // If the rel is not an aggregate or RelSubset, but is a single-input rel (could be Project, // Filter, Sort etc.), check its input @@ -234,6 +255,17 @@ public class JoinUtils { if (agg.getGroupSet().isEmpty()) { return true; } + // Checks that expression in group by is a single and it is literal. + // When Calcite rewrites EXISTS sub-queries using SubQueryRemoveRule rules, + // it creates project with TRUE literal in expressions list and aggregate on top of it + // with empty call list and literal from project expression in group set. + if (agg.getAggCallList().isEmpty() && agg.getGroupSet().cardinality() == 1) { + ProjectExpressionsCollector expressionsCollector = new ProjectExpressionsCollector(); + agg.accept(expressionsCollector); + List<RexNode> projectedExpressions = expressionsCollector.getProjectedExpressions(); + return projectedExpressions.size() == 1 + && RexUtil.isLiteral(projectedExpressions.get(agg.getGroupSet().nth(0)), true); + } } return false; } @@ -267,4 +299,75 @@ public class JoinUtils { return isScalarSubquery(left) || isScalarSubquery(right); } + /** + * Collects expressions list from the input project. + * For the case when input rel node has single input, its input is taken. + */ + private static class ProjectExpressionsCollector extends RelShuttleImpl { + private final List<RexNode> expressions = new ArrayList<>(); + + @Override + public RelNode visit(RelNode other) { + // RelShuttleImpl doesn't have visit methods for Project and RelSubset. + if (other instanceof RelSubset) { + return visit((RelSubset) other); + } else if (other instanceof Project) { + return visit((Project) other); + } + return super.visit(other); + } + + @Override + public RelNode visit(TableFunctionScan scan) { + return scan; + } + + @Override + public RelNode visit(LogicalJoin join) { + return join; + } + + @Override + public RelNode visit(LogicalCorrelate correlate) { + return correlate; + } + + @Override + public RelNode visit(LogicalUnion union) { + return union; + } + + @Override + public RelNode visit(LogicalIntersect intersect) { + return intersect; + } + + @Override + public RelNode visit(LogicalMinus minus) { + return minus; + } + + @Override + public RelNode visit(LogicalSort sort) { + return sort; + } + + @Override + public RelNode visit(LogicalExchange exchange) { + return exchange; + } + + private RelNode visit(Project project) { + expressions.addAll(project.getProjects()); + return project; + } + + private RelNode visit(RelSubset subset) { + return Util.first(subset.getBest(), subset.getOriginal()).accept(this); + } + + public List<RexNode> getProjectedExpressions() { + return expressions; + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index 2a79751..b78d76c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -113,6 +113,16 @@ public enum PlannerPhase { } }, + SUBQUERY_REWRITE("Sub-queries rewrites") { + public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) { + return RuleSets.ofList( + RuleInstance.SUB_QUERY_FILTER_REMOVE_RULE, + RuleInstance.SUB_QUERY_PROJECT_REMOVE_RULE, + RuleInstance.SUB_QUERY_JOIN_REMOVE_RULE + ); + } + }, + LOGICAL_PRUNE("Logical Planning (with partition pruning)") { public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) { return PlannerPhase.mergedRuleSets( diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java index 80bbe88..8aec96c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java @@ -40,6 +40,7 @@ import org.apache.calcite.rel.rules.ProjectToWindowRule; import org.apache.calcite.rel.rules.ProjectWindowTransposeRule; import org.apache.calcite.rel.rules.ReduceExpressionsRule; import org.apache.calcite.rel.rules.SortRemoveRule; +import org.apache.calcite.rel.rules.SubQueryRemoveRule; import org.apache.calcite.rel.rules.UnionToDistinctRule; import org.apache.drill.exec.planner.logical.DrillConditions; import org.apache.drill.exec.planner.logical.DrillRelFactories; @@ -130,4 +131,13 @@ public interface RuleInstance { FilterRemoveIsNotDistinctFromRule REMOVE_IS_NOT_DISTINCT_FROM_RULE = new FilterRemoveIsNotDistinctFromRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY)); + + SubQueryRemoveRule SUB_QUERY_FILTER_REMOVE_RULE = + new SubQueryRemoveRule.SubQueryFilterRemoveRule(DrillRelFactories.LOGICAL_BUILDER); + + SubQueryRemoveRule SUB_QUERY_PROJECT_REMOVE_RULE = + new SubQueryRemoveRule.SubQueryProjectRemoveRule(DrillRelFactories.LOGICAL_BUILDER); + + SubQueryRemoveRule SUB_QUERY_JOIN_REMOVE_RULE = + new SubQueryRemoveRule.SubQueryJoinRemoveRule(DrillRelFactories.LOGICAL_BUILDER); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java index f3c6ce0..cd696ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java @@ -32,8 +32,6 @@ import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.planner.sql.parser.DrillCalciteWrapperUtility; import org.apache.drill.exec.util.ApproximateStringMatcher; import org.apache.drill.exec.work.foreman.SqlUnsupportedException; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelShuttleImpl; @@ -46,7 +44,6 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.fun.SqlSingleValueAggFunction; import org.apache.calcite.util.NlsString; /** @@ -79,19 +76,6 @@ public class PreProcessLogicalRel extends RelShuttleImpl { } @Override - public RelNode visit(LogicalAggregate aggregate) { - for(AggregateCall aggregateCall : aggregate.getAggCallList()) { - if(aggregateCall.getAggregation() instanceof SqlSingleValueAggFunction) { - unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.FUNCTION, - "Non-scalar sub-query used in an expression\n" + - "See Apache Drill JIRA: DRILL-1937"); - throw new UnsupportedOperationException(); - } - } - return visitChild(aggregate, 0, aggregate.getInput()); - } - - @Override public RelNode visit(LogicalProject project) { final List<RexNode> projExpr = Lists.newArrayList(); for(RexNode rexNode : project.getChildExps()) { @@ -168,7 +152,7 @@ public class PreProcessLogicalRel extends RelShuttleImpl { exprList.add(newExpr); } - if (rewrite == true) { + if (rewrite) { LogicalProject newProject = project.copy(project.getTraitSet(), project.getInput(0), exprList, project.getRowType()); return visitChild(newProject, 0, project.getInput()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java index b8659d1..3f65ad2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java @@ -64,7 +64,6 @@ import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; import org.apache.calcite.sql.validate.SqlValidatorImpl; import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.sql.validate.SqlValidatorUtil; -import org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.calcite.util.Util; @@ -384,10 +383,7 @@ public class SqlConverter { //To avoid unexpected column errors set a value of top to false final RelRoot rel = sqlToRelConverter.convertQuery(validatedNode, false, false); final RelRoot rel2 = rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true)); - final RelRoot rel3 = rel2.withRel( - RelDecorrelator.decorrelateQuery(rel2.rel, - sqlToRelConverterConfig.getRelBuilderFactory().create(cluster, null))); - return rel3; + return rel2; } private class Expander implements RelOptTable.ViewExpander { @@ -478,7 +474,7 @@ public class SqlConverter { @Override public boolean isExpand() { - return SqlToRelConverterConfig.DEFAULT.isExpand(); + return false; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 98e017f..1e671ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -55,6 +55,7 @@ import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.calcite.tools.Program; import org.apache.calcite.tools.Programs; import org.apache.calcite.tools.RelConversionException; @@ -78,6 +79,7 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.cost.DrillDefaultRelMetadataProvider; import org.apache.drill.exec.planner.logical.DrillProjectRel; import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.logical.DrillScreenRel; import org.apache.drill.exec.planner.logical.DrillStoreRel; import org.apache.drill.exec.planner.logical.PreProcessLogicalRel; @@ -658,10 +660,16 @@ public class DefaultSqlHandler extends AbstractSqlHandler { return typedSqlNode; } - private RelNode convertToRel(SqlNode node) throws RelConversionException { + private RelNode convertToRel(SqlNode node) { final RelNode convertedNode = config.getConverter().toRel(node).rel; log("INITIAL", convertedNode, logger, null); - return transform(PlannerType.HEP, PlannerPhase.WINDOW_REWRITE, convertedNode); + RelNode transformedNode = transform(PlannerType.HEP, + PlannerPhase.SUBQUERY_REWRITE, convertedNode); + + RelNode decorrelatedNode = RelDecorrelator.decorrelateQuery(transformedNode, + DrillRelFactories.LOGICAL_BUILDER.create(transformedNode.getCluster(), null)); + + return transform(PlannerType.HEP, PlannerPhase.WINDOW_REWRITE, decorrelatedNode); } private RelNode preprocessNode(RelNode rel) throws SqlUnsupportedException { diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java b/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java index 47f0ae8..d40540e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java @@ -58,4 +58,50 @@ public class TestCorrelation extends PlanTestBase { .run(); } + @Test + public void testExistsScalarSubquery() throws Exception { + String query = + "SELECT employee_id\n" + + "FROM cp.`employee.json`\n" + + "WHERE EXISTS\n" + + " (SELECT *\n" + + " FROM cp.`employee.json` cs2\n" + + " )\n" + + "limit 1"; + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("employee_id") + .baselineValues(1L) + .go(); + } + + @Test + public void testSeveralExistsCorrelateSubquery() throws Exception { + String query = + "SELECT cs1.employee_id\n" + + "FROM cp.`employee.json` cs1,\n" + + " cp.`employee.json` cs3\n" + + "WHERE cs1.hire_date = cs3.hire_date\n" + + " AND EXISTS\n" + + " (SELECT *\n" + + " FROM cp.`employee.json` cs2\n" + + " WHERE " + + " cs1.position_id > cs2.position_id\n" + + " AND" + + " cs1.epmloyee_id = cs2.epmloyee_id" + + " )\n" + + " AND EXISTS\n" + + " (SELECT *\n" + + " FROM cp.`employee.json` cr1\n" + + " WHERE cs1.position_id = cr1.position_id)\n" + + "LIMIT 1"; + + testBuilder() + .sqlQuery(query) + .unOrdered() + .expectsEmptyResultSet() + .go(); + } } \ No newline at end of file diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java index 781fce3..679d01e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java @@ -47,30 +47,6 @@ public class TestDisabledFunctionality extends BaseTestQuery { throw ex; } - @Test(expected = UnsupportedFunctionException.class) // see DRILL-1937 - public void testDisabledExplainplanForComparisonWithNonscalarSubquery() throws Exception { - try { - test("explain plan for select n_name from cp.`tpch/nation.parquet` " + - "where n_nationkey = " + - "(select r_regionkey from cp.`tpch/region.parquet` " + - "where r_regionkey = 1)"); - } catch(UserException ex) { - throwAsUnsupportedException(ex); - } - } - - @Test(expected = UnsupportedFunctionException.class) // see DRILL-1937 - public void testDisabledComparisonWithNonscalarSubquery() throws Exception { - try { - test("select n_name from cp.`tpch/nation.parquet` " + - "where n_nationkey = " + - "(select r_regionkey from cp.`tpch/region.parquet` " + - "where r_regionkey = 1)"); - } catch(UserException ex) { - throwAsUnsupportedException(ex); - } - } - @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921 public void testDisabledIntersect() throws Exception { try { @@ -215,19 +191,6 @@ public class TestDisabledFunctionality extends BaseTestQuery { } } - @Test(expected = UnsupportedFunctionException.class) // see DRILL-1325, DRILL-2155, see DRILL-1937 - public void testMultipleUnsupportedOperatorations() throws Exception { - try { - test("select a.lastname, b.n_name " + - "from cp.`employee.json` a, cp.`tpch/nation.parquet` b " + - "where b.n_nationkey = " + - "(select r_regionkey from cp.`tpch/region.parquet` " + - "where r_regionkey = 1)"); - } catch(UserException ex) { - throwAsUnsupportedException(ex); - } - } - @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-2068, DRILL-1325 public void testExplainPlanForCartesianJoin() throws Exception { try { diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 3e17a7a..adc8e35 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -1165,4 +1165,37 @@ public class TestExampleQueries extends BaseTestQuery { .build() .run(); } + + @Test + public void testComparisonWithSingleValueSubQuery() throws Exception { + String query = "select n_name from cp.`tpch/nation.parquet` " + + "where n_nationkey = " + + "(select r_regionkey from cp.`tpch/region.parquet` " + + "where r_regionkey = 1)"; + PlanTestBase.testPlanMatchingPatterns(query, + new String[]{"agg.*SINGLE_VALUE", "Filter.*=\\(\\$0, 1\\)"}); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("n_name") + .baselineValues("ARGENTINA") + .go(); + } + + @Test + public void testMultipleComparisonWithSingleValueSubQuery() throws Exception { + String query = "select a.last_name, b.n_name " + + "from cp.`employee.json` a, cp.`tpch/nation.parquet` b " + + "where b.n_nationkey = " + + "(select r_regionkey from cp.`tpch/region.parquet` " + + "where r_regionkey = 1) limit 1"; + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("last_name", "n_name") + .baselineValues("Nowmer", "ARGENTINA") + .go(); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java index 54f7250..fd97564 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java @@ -138,7 +138,6 @@ public class TestTpchDistributed extends BaseTestQuery { } @Test - @Ignore public void tpch21() throws Exception{ testDistributed("queries/tpch/21.sql"); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java index 56287c9..ed242be 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java @@ -145,7 +145,6 @@ public class TestTpchDistributedStreaming extends BaseTestQuery { } @Test - @Ignore public void tpch21() throws Exception{ testDistributed("queries/tpch/21.sql"); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java index 3cbe5ef..1aa9de2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java @@ -148,7 +148,6 @@ public class TestTpchExplain extends BaseTestQuery { } @Test - @Ignore public void tpch21() throws Exception{ doExplain("queries/tpch/21.sql"); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java index 9400c7c..51aea18 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java @@ -140,7 +140,6 @@ public class TestTpchLimit0 extends BaseTestQuery { } @Test - @Ignore public void tpch21() throws Exception{ testLimitZero("queries/tpch/21.sql"); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java index 1745301..f995ce6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java @@ -141,7 +141,6 @@ public class TestTpchPlanning extends PlanningBase { } @Test - @Ignore // DRILL-519 public void tpch21() throws Exception { testSqlPlanFromFile("queries/tpch/21.sql"); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java index 2863bbb..a3e34c0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java @@ -139,7 +139,6 @@ public class TestTpchSingleMode extends BaseTestQuery { } @Test - @Ignore public void tpch21() throws Exception{ testSingleMode("queries/tpch/21.sql"); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java index 0a5b7cd..445b18e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java @@ -25,6 +25,9 @@ import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.util.Text; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.OperatorTest; import org.apache.drill.PlanTestBase; @@ -37,23 +40,32 @@ import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.junit.BeforeClass; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.math.BigDecimal; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @Category({SqlFunctionTest.class, OperatorTest.class, PlannerTest.class}) public class TestAggregateFunctions extends BaseTestQuery { + @Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass public static void setupFiles() { dirTestWatcher.copyResourceToRoot(Paths.get("agg")); @@ -521,6 +533,7 @@ public class TestAggregateFunctions extends BaseTestQuery { .go(); } + @Test public void minMaxEmptyNonNullableInput() throws Exception { // test min and max functions on required type @@ -567,6 +580,73 @@ public class TestAggregateFunctions extends BaseTestQuery { } } + @Test + public void testSingleValueFunction() throws Exception { + List<String> tableNames = ImmutableList.of( + "cp.`parquet/alltypes_required.parquet`", + "cp.`parquet/alltypes_optional.parquet`"); + for (String tableName : tableNames) { + final QueryDataBatch result = + testSqlWithResults(String.format("select * from %s limit 1", tableName)).get(0); + + final Map<String, StringBuilder> functions = new HashMap<>(); + functions.put("single_value", new StringBuilder()); + + final Map<String, Object> resultingValues = new HashMap<>(); + final List<String> columns = new ArrayList<>(); + + final RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); + loader.load(result.getHeader().getDef(), result.getData()); + + for (VectorWrapper<?> vectorWrapper : loader.getContainer()) { + final String fieldName = vectorWrapper.getField().getName(); + Object object = vectorWrapper.getValueVector().getAccessor().getObject(0); + // VarCharVector returns Text instance, but baseline values should contain String value + if (object instanceof Text) { + object = object.toString(); + } + resultingValues.put(String.format("`%s`", fieldName), object); + for (Map.Entry<String, StringBuilder> function : functions.entrySet()) { + function.getValue() + .append(function.getKey()) + .append("(") + .append(fieldName) + .append(") ") + .append(fieldName) + .append(","); + } + columns.add(fieldName); + } + loader.clear(); + result.release(); + + String columnsList = columns.stream() + .collect(Collectors.joining(", ")); + + final List<Map<String, Object>> baselineRecords = new ArrayList<>(); + baselineRecords.add(resultingValues); + + for (StringBuilder selectBody : functions.values()) { + selectBody.setLength(selectBody.length() - 1); + + testBuilder() + .sqlQuery("select %s from (select %s from %s limit 1)", + selectBody.toString(), columnsList, tableName) + .unOrdered() + .baselineRecords(baselineRecords) + .go(); + } + } + } + + @Test + public void testSingleValueWithMultipleValuesInput() throws Exception { + thrown.expect(UserRemoteException.class); + thrown.expectMessage(containsString("FUNCTION ERROR")); + thrown.expectMessage(containsString("Input for single_value function has more than one row")); + test("select single_value(n_name) from cp.`tpch/nation.parquet`"); + } + /* * Streaming agg on top of a filter produces wrong results if the first two batches are filtered out. * In the below test we have three files in the input directory and since the ordering of reading diff --git a/pom.xml b/pom.xml index 242b134..f69288e 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ <dep.guava.version>18.0</dep.guava.version> <forkCount>2</forkCount> <parquet.version>1.10.0</parquet.version> - <calcite.version>1.16.0-drill-r3</calcite.version> + <calcite.version>1.16.0-drill-r4</calcite.version> <avatica.version>1.11.0</avatica.version> <janino.version>2.7.6</janino.version> <sqlline.version>1.1.9-drill-r7</sqlline.version>
