This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit a27a1047b16621f6c3c6c181c97f8713231f6c6c Author: Gautam Parai <[email protected]> AuthorDate: Tue Apr 3 19:18:31 2018 -0700 DRILL-6375 : Support for ANY_VALUE aggregate function closes #1256 --- .../apache/drill/exec/expr/fn/HiveFuncHolder.java | 2 +- .../java-exec/src/main/codegen/data/AggrTypes1.tdd | 46 +++ .../src/main/codegen/data/DecimalAggrTypes1.tdd | 6 + .../main/codegen/templates/AggrTypeFunctions1.java | 22 +- .../codegen/templates/ComplexAggrFunctions1.java | 120 ++++++++ .../templates/DateIntervalAggrFunctions1.java | 11 +- .../Decimal/DecimalAggrTypeFunctions1.java | 97 +++++++ .../codegen/templates/VarCharAggrFunctions1.java | 11 + .../compile/sig/ConstantExpressionIdentifier.java | 7 + .../apache/drill/exec/expr/EvaluationVisitor.java | 17 +- .../drill/exec/expr/fn/AbstractFuncHolder.java | 2 +- .../drill/exec/expr/fn/DrillAggFuncHolder.java | 27 +- .../expr/fn/DrillComplexWriterAggFuncHolder.java | 142 ++++++++++ .../apache/drill/exec/expr/fn/DrillFuncHolder.java | 34 ++- .../drill/exec/expr/fn/FunctionConverter.java | 4 +- .../apache/drill/exec/expr/fn/impl/Mappify.java | 2 +- .../drill/exec/expr/fn/impl/MappifyUtility.java | 34 ++- .../physical/impl/aggregate/StreamingAggBatch.java | 67 ++++- .../drill/exec/planner/physical/HashAggPrule.java | 14 +- .../drill/exec/vector/complex/MapUtility.java | 308 ++++++++++++++++++++- .../physical/impl/agg/TestAggWithAnyValue.java | 149 ++++++++++ .../test/resources/store/json/test_anyvalue.json | 50 ++++ .../codegen/templates/RepeatedValueVectors.java | 1 + .../src/main/codegen/templates/ValueHolders.java | 11 +- .../exec/expr/holders/RepeatedListHolder.java | 2 + .../drill/exec/expr/holders/RepeatedMapHolder.java | 3 + .../drill/common/expression/parser/ExprLexer.g | 1 + .../drill/common/expression/parser/ExprParser.g | 5 + .../common/expression/AnyValueExpression.java | 69 +++++ .../common/expression/ExpressionStringBuilder.java | 8 + .../common/expression/FunctionCallFactory.java | 8 + .../expression/visitors/AbstractExprVisitor.java | 6 + .../expression/visitors/AggregateChecker.java | 6 + .../visitors/ConditionalExprOptimizer.java | 7 + .../expression/visitors/ConstantChecker.java | 6 + .../common/expression/visitors/ExprVisitor.java | 2 + .../expression/visitors/ExpressionValidator.java | 7 + 37 files changed, 1265 insertions(+), 49 deletions(-) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/expr/fn/HiveFuncHolder.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/expr/fn/HiveFuncHolder.java index 8e7b645..80f299e 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/expr/fn/HiveFuncHolder.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/expr/fn/HiveFuncHolder.java @@ -130,7 +130,7 @@ public class HiveFuncHolder extends AbstractFuncHolder { * @return workspace variables */ @Override - public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables){ + public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference){ JVar[] workspaceJVars = new JVar[5]; workspaceJVars[0] = g.declareClassField("returnOI", g.getModel()._ref(ObjectInspector.class)); diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd index 202f539..3fb2601 100644 --- a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd +++ b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd @@ -88,6 +88,52 @@ {inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"}, {inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"} ] + }, + {className: "AnyValue", funcName: "any_value", types: [ + {inputType: "Bit", outputType: "NullableBit", runningType: "Bit", major: "Numeric"}, + {inputType: "Int", outputType: "NullableInt", runningType: "Int", major: "Numeric"}, + {inputType: "BigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "Numeric"}, + {inputType: "NullableBit", outputType: "NullableBit", runningType: "Bit", major: "Numeric"}, + {inputType: "NullableInt", outputType: "NullableInt", runningType: "Int", major: "Numeric"}, + {inputType: "NullableBigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "Numeric"}, + {inputType: "Float4", outputType: "NullableFloat4", runningType: "Float4", major: "Numeric"}, + {inputType: "Float8", outputType: "NullableFloat8", runningType: "Float8", major: "Numeric"}, + {inputType: "NullableFloat4", outputType: "NullableFloat4", runningType: "Float4", major: "Numeric"}, + {inputType: "NullableFloat8", outputType: "NullableFloat8", runningType: "Float8", major: "Numeric"}, + {inputType: "Date", outputType: "NullableDate", runningType: "Date", major: "Date", initialValue: "0"}, + {inputType: "NullableDate", outputType: "NullableDate", runningType: "Date", major: "Date", initialValue: "0"}, + {inputType: "TimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "Date", initialValue: "0"}, + {inputType: "NullableTimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "Date", initialValue: "0"}, + {inputType: "Time", outputType: "NullableTime", runningType: "Time", major: "Date", initialValue: "0"}, + {inputType: "NullableTime", outputType: "NullableTime", runningType: "Time", major: "Date", initialValue: "0"}, + {inputType: "IntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "Date", initialValue: "0"}, + {inputType: "NullableIntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "Date", initialValue: "0"}, + {inputType: "IntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "Date", initialValue: "0"}, + {inputType: "NullableIntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "Date", initialValue: "0"}, + {inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"}, + {inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Date", initialValue: "0"}, + {inputType: "VarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "VarBytes", initialValue: ""}, + {inputType: "NullableVarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "VarBytes", initialValue: ""}, + {inputType: "VarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "VarBytes"}, + {inputType: "NullableVarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "VarBytes"} + {inputType: "List", outputType: "List", runningType: "List", major: "Complex"} + {inputType: "Map", outputType: "Map", runningType: "Map", major: "Complex"} + {inputType: "RepeatedBit", outputType: "RepeatedBit", runningType: "RepeatedBit", major: "Complex"}, + {inputType: "RepeatedInt", outputType: "RepeatedInt", runningType: "RepeatedInt", major: "Complex"}, + {inputType: "RepeatedBigInt", outputType: "RepeatedBigInt", runningType: "RepeatedBigInt", major: "Complex"}, + {inputType: "RepeatedFloat4", outputType: "RepeatedFloat4", runningType: "RepeatedFloat4", major: "Complex"}, + {inputType: "RepeatedFloat8", outputType: "RepeatedFloat8", runningType: "RepeatedFloat8", major: "Complex"}, + {inputType: "RepeatedDate", outputType: "RepeatedDate", runningType: "RepeatedDate", major: "Complex"}, + {inputType: "RepeatedTimeStamp", outputType: "RepeatedTimeStamp", runningType: "RepeatedTimeStamp", major: "Complex"}, + {inputType: "RepeatedTime", outputType: "RepeatedTime", runningType: "RepeatedTime", major: "Complex"}, + {inputType: "RepeatedIntervalDay", outputType: "RepeatedIntervalDay", runningType: "RepeatedIntervalDay", major: "Complex"}, + {inputType: "RepeatedIntervalYear", outputType: "RepeatedIntervalYear", runningType: "RepeatedIntervalYear", major: "Complex"}, + {inputType: "RepeatedInterval", outputType: "RepeatedInterval", runningType: "RepeatedInterval", major: "Complex"}, + {inputType: "RepeatedVarChar", outputType: "RepeatedVarChar", runningType: "RepeatedVarChar", major: "Complex"}, + {inputType: "RepeatedVarBinary", outputType: "RepeatedVarBinary", runningType: "RepeatedVarBinary", major: "Complex"}, + {inputType: "RepeatedList", outputType: "RepeatedList", runningType: "RepeatedList", major: "Complex"}, + {inputType: "RepeatedMap", outputType: "RepeatedMap", runningType: "RepeatedMap", major: "Complex"} + ] } ] } diff --git a/exec/java-exec/src/main/codegen/data/DecimalAggrTypes1.tdd b/exec/java-exec/src/main/codegen/data/DecimalAggrTypes1.tdd index 7da2d07..003bbfa 100644 --- a/exec/java-exec/src/main/codegen/data/DecimalAggrTypes1.tdd +++ b/exec/java-exec/src/main/codegen/data/DecimalAggrTypes1.tdd @@ -35,6 +35,12 @@ {inputType: "VarDecimal", outputType: "NullableVarDecimal"}, {inputType: "NullableVarDecimal", outputType: "NullableVarDecimal"} ] + }, + {className: "AnyValue", funcName: "any_value", types: [ + {inputType: "VarDecimal", outputType: "NullableVarDecimal"}, + {inputType: "NullableVarDecimal", outputType: "NullableVarDecimal"} + {inputType: "RepeatedVarDecimal", outputType: "RepeatedVarDecimal"} + ] } ] } diff --git a/exec/java-exec/src/main/codegen/templates/AggrTypeFunctions1.java b/exec/java-exec/src/main/codegen/templates/AggrTypeFunctions1.java index ebf20e5..59d3715 100644 --- a/exec/java-exec/src/main/codegen/templates/AggrTypeFunctions1.java +++ b/exec/java-exec/src/main/codegen/templates/AggrTypeFunctions1.java @@ -61,11 +61,11 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu value = new ${type.runningType}Holder(); nonNullCount = new BigIntHolder(); nonNullCount.value = 0; - <#if aggrtype.funcName == "sum"> + <#if aggrtype.funcName == "sum" || aggrtype.funcName == "any_value"> value.value = 0; <#elseif aggrtype.funcName == "min"> <#if type.runningType?starts_with("Bit")> - value.value = 1; + value.value = 1; <#elseif type.runningType?starts_with("Int")> value.value = Integer.MAX_VALUE; <#elseif type.runningType?starts_with("BigInt")> @@ -77,7 +77,7 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu </#if> <#elseif aggrtype.funcName == "max"> <#if type.runningType?starts_with("Bit")> - value.value = 0; + value.value = 0; <#elseif type.runningType?starts_with("Int")> value.value = Integer.MIN_VALUE; <#elseif type.runningType?starts_with("BigInt")> @@ -110,19 +110,21 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu value.value = Float.isNaN(value.value) ? in.value : Math.min(value.value, in.value); } <#elseif type.inputType?contains("Float8")> - if(!Double.isNaN(in.value)) { - value.value = Double.isNaN(value.value) ? in.value : Math.min(value.value, in.value); - } - <#else> + if(!Double.isNaN(in.value)) { + value.value = Double.isNaN(value.value) ? in.value : Math.min(value.value, in.value); + } + <#else> value.value = Math.min(value.value, in.value); - </#if> + </#if> <#elseif aggrtype.funcName == "max"> value.value = Math.max(value.value, in.value); <#elseif aggrtype.funcName == "sum"> value.value += in.value; <#elseif aggrtype.funcName == "count"> value.value++; - <#else> + <#elseif aggrtype.funcName == "any_value"> + value.value = in.value; + <#else> // TODO: throw an error ? </#if> <#if type.inputType?starts_with("Nullable")> @@ -143,7 +145,7 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu @Override public void reset() { nonNullCount.value = 0; - <#if aggrtype.funcName == "sum" || aggrtype.funcName == "count"> + <#if aggrtype.funcName == "sum" || aggrtype.funcName == "count" || aggrtype.funcName == "any_value"> value.value = 0; <#elseif aggrtype.funcName == "min"> <#if type.runningType?starts_with("Int")> diff --git a/exec/java-exec/src/main/codegen/templates/ComplexAggrFunctions1.java b/exec/java-exec/src/main/codegen/templates/ComplexAggrFunctions1.java new file mode 100644 index 0000000..6aa92e3 --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/ComplexAggrFunctions1.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +<@pp.dropOutputFile /> + + + +<#list aggrtypes1.aggrtypes as aggrtype> +<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/gaggr/${aggrtype.className}ComplexFunctions.java" /> + +<#include "/@includes/license.ftl" /> + +/* + * This class is generated using freemarker and the ${.template_name} template. + */ + +<#-- A utility class that is used to generate java code for aggr functions that maintain a single --> +<#-- running counter to hold the result. This includes: ANY_VALUE. --> + +package org.apache.drill.exec.expr.fn.impl.gaggr; + +import org.apache.drill.exec.expr.DrillAggFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.annotations.Workspace; +import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.MapUtility; +import org.apache.drill.exec.vector.complex.writer.*; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.*; + +@SuppressWarnings("unused") + +public class ${aggrtype.className}ComplexFunctions { +static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${aggrtype.className}ComplexFunctions.class); + +<#list aggrtype.types as type> +<#if type.major == "Complex"> + +@FunctionTemplate(name = "${aggrtype.funcName}", scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE) +public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc{ + @Param ${type.inputType}Holder inHolder; + @Workspace BigIntHolder nonNullCount; + @Output ComplexWriter writer; + + public void setup() { + nonNullCount = new BigIntHolder(); + nonNullCount.value = 0; + } + + @Override + public void add() { + <#if type.inputType?starts_with("Nullable")> + sout: { + if (inHolder.isSet == 0) { + // processing nullable input and the value is null, so don't do anything... + break sout; + } + </#if> + <#if aggrtype.funcName == "any_value"> + <#if type.runningType?starts_with("Map")> + if (nonNullCount.value == 0) { + org.apache.drill.exec.expr.fn.impl.MappifyUtility.createMap(inHolder.reader, writer, "any_value"); + } + <#elseif type.runningType?starts_with("RepeatedMap")> + if (nonNullCount.value == 0) { + org.apache.drill.exec.expr.fn.impl.MappifyUtility.createRepeatedMapOrList(inHolder.reader, writer, "any_value"); + } + <#elseif type.runningType?starts_with("List")> + if (nonNullCount.value == 0) { + org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(inHolder.reader, writer, "any_value"); + } + <#elseif type.runningType?starts_with("RepeatedList")> + if (nonNullCount.value == 0) { + org.apache.drill.exec.expr.fn.impl.MappifyUtility.createRepeatedMapOrList(inHolder.reader, writer, "any_value"); + } + <#elseif type.runningType?starts_with("Repeated")> + if (nonNullCount.value == 0) { + org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(inHolder.reader, writer, "any_value"); + } + </#if> + </#if> + nonNullCount.value = 1; + <#if type.inputType?starts_with("Nullable")> + } // end of sout block + </#if> + } + + @Override + public void output() { + //Do nothing since the complex writer takes care of everything! + } + + @Override + public void reset() { + <#if aggrtype.funcName == "any_value"> + nonNullCount.value = 0; + </#if> + } +} +</#if> +</#list> +} +</#list> \ No newline at end of file diff --git a/exec/java-exec/src/main/codegen/templates/DateIntervalAggrFunctions1.java b/exec/java-exec/src/main/codegen/templates/DateIntervalAggrFunctions1.java index f526575..8080ea7 100644 --- a/exec/java-exec/src/main/codegen/templates/DateIntervalAggrFunctions1.java +++ b/exec/java-exec/src/main/codegen/templates/DateIntervalAggrFunctions1.java @@ -131,7 +131,16 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu </#if> <#elseif aggrtype.funcName == "count"> value.value++; - <#else> + <#elseif aggrtype.funcName == "any_value"> + <#if type.outputType?ends_with("Interval")> + value.days = in.days; + value.months = in.months; + value.milliseconds = in.milliseconds; + <#elseif type.outputType?ends_with("IntervalDay")> + value.days = in.days; + value.milliseconds = in.milliseconds; + </#if> + <#else> // TODO: throw an error ? </#if> <#if type.inputType?starts_with("Nullable")> diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalAggrTypeFunctions1.java b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalAggrTypeFunctions1.java index 6b23f92..7f4ca15 100644 --- a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalAggrTypeFunctions1.java +++ b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalAggrTypeFunctions1.java @@ -39,6 +39,8 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; import org.apache.drill.exec.expr.annotations.Output; import org.apache.drill.exec.expr.annotations.Param; import org.apache.drill.exec.expr.annotations.Workspace; +import org.apache.drill.exec.vector.complex.writer.*; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.*; import javax.inject.Inject; import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.holders.*; @@ -124,6 +126,101 @@ public class Decimal${aggrtype.className}Functions { nonNullCount.value = 0; } } + <#elseif aggrtype.funcName.contains("any_value") && type.inputType?starts_with("Repeated")> + @FunctionTemplate(name = "${aggrtype.funcName}", + scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE, + returnType = FunctionTemplate.ReturnType.DECIMAL_AGGREGATE) + public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc { + @Param ${type.inputType}Holder in; + @Output ComplexWriter writer; + @Workspace BigIntHolder nonNullCount; + + public void setup() { + nonNullCount = new BigIntHolder(); + } + + @Override + public void add() { + if (nonNullCount.value == 0) { + org.apache.drill.exec.expr.fn.impl.MappifyUtility.createList(in.reader, writer, "any_value"); + } + nonNullCount.value = 1; + } + + @Override + public void output() { + } + + @Override + public void reset() { + nonNullCount.value = 0; + } + } + <#elseif aggrtype.funcName.contains("any_value")> + @FunctionTemplate(name = "${aggrtype.funcName}", + scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE, + returnType = FunctionTemplate.ReturnType.DECIMAL_AGGREGATE) + public static class ${type.inputType}${aggrtype.className} implements DrillAggFunc { + @Param ${type.inputType}Holder in; + @Inject DrillBuf buffer; + @Workspace ObjectHolder value; + @Workspace IntHolder scale; + @Workspace IntHolder precision; + @Output ${type.outputType}Holder out; + @Workspace BigIntHolder nonNullCount; + + public void setup() { + value = new ObjectHolder(); + value.obj = java.math.BigDecimal.ZERO; + nonNullCount = new BigIntHolder(); + } + + @Override + public void add() { + <#if type.inputType?starts_with("Nullable")> + sout: { + if (in.isSet == 0) { + // processing nullable input and the value is null, so don't do anything... + break sout; + } + </#if> + if (nonNullCount.value == 0) { + value.obj=org.apache.drill.exec.util.DecimalUtility + .getBigDecimalFromDrillBuf(in.buffer,in.start,in.end-in.start,in.scale); + scale.value = in.scale; + precision.value = in.precision; + } + nonNullCount.value = 1; + <#if type.inputType?starts_with("Nullable")> + } // end of sout block + </#if> + } + + @Override + public void output() { + if (nonNullCount.value > 0) { + out.isSet = 1; + byte[] bytes = ((java.math.BigDecimal)value.obj).unscaledValue().toByteArray(); + int len = bytes.length; + out.start = 0; + out.buffer = buffer.reallocIfNeeded(len); + out.buffer.setBytes(0, bytes); + out.end = len; + out.scale = scale.value; + out.precision = precision.value; + } else { + out.isSet = 0; + } + } + + @Override + public void reset() { + scale.value = 0; + precision.value = 0; + value.obj = null; + nonNullCount.value = 0; + } + } <#elseif aggrtype.funcName == "max" || aggrtype.funcName == "min"> @FunctionTemplate(name = "${aggrtype.funcName}", diff --git a/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java b/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java index a5afce9..de5d705 100644 --- a/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java +++ b/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java @@ -90,6 +90,16 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu break sout; } </#if> + <#if aggrtype.className == "AnyValue"> + if (nonNullCount.value == 0) { + nonNullCount.value = 1; + int inputLength = in.end - in.start; + org.apache.drill.exec.expr.fn.impl.DrillByteArray tmp = (org.apache.drill.exec.expr.fn.impl.DrillByteArray) value.obj; + byte[] tempArray = new byte[inputLength]; + in.buffer.getBytes(in.start, tempArray, 0, inputLength); + tmp.setBytes(tempArray); + } + <#else> nonNullCount.value = 1; org.apache.drill.exec.expr.fn.impl.DrillByteArray tmp = (org.apache.drill.exec.expr.fn.impl.DrillByteArray) value.obj; int cmp = 0; @@ -121,6 +131,7 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu tmp.setBytes(tempArray); } } + </#if> <#if type.inputType?starts_with("Nullable")> } // end of sout block </#if> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/ConstantExpressionIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/ConstantExpressionIdentifier.java index d764663..0175d51 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/ConstantExpressionIdentifier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/ConstantExpressionIdentifier.java @@ -22,6 +22,7 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Set; +import org.apache.drill.common.expression.AnyValueExpression; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; @@ -235,6 +236,12 @@ public class ConstantExpressionIdentifier implements ExprVisitor<Boolean, Identi } @Override + public Boolean visitAnyValueExpression(AnyValueExpression e, + IdentityHashMap<LogicalExpression, Object> value) throws RuntimeException { + return e.getInput().accept(this, value); + } + + @Override public Boolean visitParameter(ValueExpressions.ParameterExpression e, IdentityHashMap<LogicalExpression, Object> value) throws RuntimeException { return false; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java index 64cfe66..4486972 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import org.apache.drill.common.expression.AnyValueExpression; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; @@ -181,7 +182,7 @@ public class EvaluationVisitor { AbstractFuncHolder holder = (AbstractFuncHolder) holderExpr.getHolder(); - JVar[] workspaceVars = holder.renderStart(generator, null); + JVar[] workspaceVars = holder.renderStart(generator, null, holderExpr.getFieldReference()); if (holder.isNested()) { generator.getMappingSet().enterChild(); @@ -456,8 +457,7 @@ public class EvaluationVisitor { generator.getEvalBlock().add(eval); } else { - JExpression vector = e.isSuperReader() ? vv1.component(componentVariable) : vv1; - JExpression expr = vector.invoke("getReader"); + JExpression expr = vv1.invoke("getReader"); PathSegment seg = e.getReadPath(); JVar isNull = null; @@ -713,6 +713,17 @@ public class EvaluationVisitor { return fc.accept(this, value); } + @Override + public HoldingContainer visitAnyValueExpression(AnyValueExpression e, ClassGenerator<?> value) + throws RuntimeException { + + List<LogicalExpression> newArgs = Lists.newArrayList(); + newArgs.add(e.getInput()); // input_expr + + FunctionCall fc = new FunctionCall(AnyValueExpression.ANY_VALUE, newArgs, e.getPosition()); + return fc.accept(this, value); + } + private HoldingContainer visitBooleanAnd(BooleanOperator op, ClassGenerator<?> generator) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/AbstractFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/AbstractFuncHolder.java index 4902260..7dd58ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/AbstractFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/AbstractFuncHolder.java @@ -32,7 +32,7 @@ import com.sun.codemodel.JVar; public abstract class AbstractFuncHolder implements FuncHolder { - public abstract JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables); + public abstract JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference); public void renderMiddle(ClassGenerator<?> g, HoldingContainer[] inputVariables, JVar[] workspaceJVars) { // default implementation is add no code diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java index e1cd96f..1a5df67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; @@ -44,19 +45,19 @@ import com.sun.codemodel.JVar; class DrillAggFuncHolder extends DrillFuncHolder { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillAggFuncHolder.class); - private String setup() { + protected String setup() { return meth("setup"); } - private String reset() { + protected String reset() { return meth("reset", false); } - private String add() { + protected String add() { return meth("add"); } - private String output() { + protected String output() { return meth("output"); } - private String cleanup() { + protected String cleanup() { return meth("cleanup", false); } @@ -78,7 +79,7 @@ class DrillAggFuncHolder extends DrillFuncHolder { } @Override - public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables) { + public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference) { if (!g.getMappingSet().isHashAggMapping()) { //Declare workspace vars for non-hash-aggregation. JVar[] workspaceJVars = declareWorkspaceVariables(g); generateBody(g, BlockType.SETUP, setup(), null, workspaceJVars, true); @@ -128,12 +129,20 @@ class DrillAggFuncHolder extends DrillFuncHolder { @Override public HoldingContainer renderEnd(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, JVar[] workspaceJVars, FieldReference fieldReference) { - HoldingContainer out = classGenerator.declare(getReturnType(), false); + HoldingContainer out = null; + JVar internalOutput = null; + if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) { + out = classGenerator.declare(getReturnType(), false); + } JBlock sub = new JBlock(); + if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) { + internalOutput = sub.decl(JMod.FINAL, classGenerator.getHolderType(getReturnType()), getReturnValue().getName(), JExpr._new(classGenerator.getHolderType(getReturnType()))); + } classGenerator.getEvalBlock().add(sub); - JVar internalOutput = sub.decl(JMod.FINAL, classGenerator.getHolderType(getReturnType()), getReturnValue().getName(), JExpr._new(classGenerator.getHolderType(getReturnType()))); addProtectedBlock(classGenerator, sub, output(), null, workspaceJVars, false); - sub.assign(out.getHolder(), internalOutput); + if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) { + sub.assign(out.getHolder(), internalOutput); + } //hash aggregate uses workspace vectors. Initialization is done in "setup" and does not require "reset" block. if (!classGenerator.getMappingSet().isHashAggMapping()) { generateBody(classGenerator, BlockType.RESET, reset(), null, workspaceJVars, false); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java new file mode 100644 index 0000000..44766bd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.expr.fn; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; +import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch; +import org.apache.drill.exec.physical.impl.aggregate.StreamingAggTemplate; +import org.apache.drill.exec.record.VectorAccessibleComplexWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + +import com.sun.codemodel.JBlock; +import com.sun.codemodel.JClass; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JInvocation; +import com.sun.codemodel.JVar; +import com.sun.codemodel.JMod; + +public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder { + + // Complex writer to write out complex data-types e.g. repeated maps/lists + private JVar complexWriter; + // The index at which to write - important when group-by is present. Implicit assumption that the output indexes + // will be sequential starting from 0. i.e. the first group would be written at index 0, second group at index 1 + // and so on. + private JVar writerIdx; + private JVar lastWriterIdx; + public DrillComplexWriterAggFuncHolder(FunctionAttributes functionAttributes, FunctionInitializer initializer) { + super(functionAttributes, initializer); + } + + @Override + public boolean isComplexWriterFuncHolder() { + return true; + } + + @Override + public JVar[] renderStart(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, FieldReference fieldReference) { + if (!classGenerator.getMappingSet().isHashAggMapping()) { //Declare workspace vars for non-hash-aggregation. + JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer"); + + complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class)); + writerIdx = classGenerator.declareClassField("writerIdx", classGenerator.getModel()._ref(int.class)); + lastWriterIdx = classGenerator.declareClassField("lastWriterIdx", classGenerator.getModel()._ref(int.class)); + //Default name is "col", if not passed in a reference name for the output vector. + String refName = fieldReference == null ? "col" : fieldReference.getRootSegment().getPath(); + JClass cwClass = classGenerator.getModel().ref(VectorAccessibleComplexWriter.class); + classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container)); + classGenerator.getSetupBlock().assign(writerIdx, JExpr.lit(0)); + classGenerator.getSetupBlock().assign(lastWriterIdx, JExpr.lit(-1)); + + JVar[] workspaceJVars = declareWorkspaceVariables(classGenerator); + generateBody(classGenerator, ClassGenerator.BlockType.SETUP, setup(), null, workspaceJVars, true); + return workspaceJVars; + } else { + return super.renderStart(classGenerator, inputVariables, fieldReference); + } + } + + @Override + public void renderMiddle(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, JVar[] workspaceJVars) { + + classGenerator.getEvalBlock().directStatement(String.format("//---- start of eval portion of %s function. ----//", + getRegisteredNames()[0])); + + JBlock sub = new JBlock(true, true); + JBlock topSub = sub; + JClass aggBatchClass = null; + + if (classGenerator.getCodeGenerator().getDefinition() == StreamingAggTemplate.TEMPLATE_DEFINITION) { + aggBatchClass = classGenerator.getModel().ref(StreamingAggBatch.class); + } + assert aggBatchClass != null : "ComplexWriterAggFuncHolder should only be used with an Aggregate Operator"; + + JExpression aggBatch = JExpr.cast(aggBatchClass, classGenerator.getMappingSet().getOutgoing()); + + classGenerator.getSetupBlock().add(aggBatch.invoke("addComplexWriter").arg(complexWriter)); + // Only set the writer if there is a position change. Calling setPosition may cause underlying writers to allocate + // new vectors, thereby, losing the previously stored values + JBlock condAssignCW = classGenerator.getEvalBlock()._if(lastWriterIdx.ne(writerIdx))._then(); + condAssignCW.add(complexWriter.invoke("setPosition").arg(writerIdx)); + condAssignCW.assign(lastWriterIdx, writerIdx); + sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter); + + // add the subblock after the out declaration. + classGenerator.getEvalBlock().add(topSub); + + addProtectedBlock(classGenerator, sub, add(), inputVariables, workspaceJVars, false); + classGenerator.getEvalBlock().directStatement(String.format("//---- end of eval portion of %s function. ----//", + getRegisteredNames()[0])); + } + + @Override + public HoldingContainer renderEnd(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, + JVar[] workspaceJVars, FieldReference fieldReference) { + HoldingContainer out = null; + JVar internalOutput = null; + if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) { + out = classGenerator.declare(getReturnType(), false); + } + JBlock sub = new JBlock(); + if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) { + internalOutput = sub.decl(JMod.FINAL, classGenerator.getHolderType(getReturnType()), getReturnValue().getName(), + JExpr._new(classGenerator.getHolderType(getReturnType()))); + } + classGenerator.getEvalBlock().add(sub); + if (getReturnType().getMinorType() == TypeProtos.MinorType.LATE) { + sub.assignPlus(writerIdx, JExpr.lit(1)); + } + addProtectedBlock(classGenerator, sub, output(), null, workspaceJVars, false); + if (getReturnType().getMinorType() != TypeProtos.MinorType.LATE) { + sub.assign(out.getHolder(), internalOutput); + } + //hash aggregate uses workspace vectors. Initialization is done in "setup" and does not require "reset" block. + if (!classGenerator.getMappingSet().isHashAggMapping()) { + generateBody(classGenerator, ClassGenerator.BlockType.RESET, reset(), null, workspaceJVars, false); + } + generateBody(classGenerator, ClassGenerator.BlockType.CLEANUP, cleanup(), null, workspaceJVars, false); + + return out; + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java index 9df5305..240ff27 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java @@ -23,8 +23,10 @@ import java.util.List; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; @@ -36,6 +38,9 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.DrillFuncHolderExpr; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.expr.holders.ListHolder; +import org.apache.drill.exec.expr.holders.MapHolder; +import org.apache.drill.exec.expr.holders.RepeatedMapHolder; import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.vector.complex.reader.FieldReader; @@ -80,7 +85,7 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder { } @Override - public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables) { + public JVar[] renderStart(ClassGenerator<?> g, HoldingContainer[] inputVariables, FieldReference fieldReference) { return declareWorkspaceVariables(g); } @@ -186,12 +191,35 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder { ValueReference parameter = attributes.getParameters()[i]; HoldingContainer inputVariable = inputVariables[i]; - if (parameter.isFieldReader() && ! inputVariable.isReader() && ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) { + if (parameter.isFieldReader() && ! inputVariable.isReader() + && ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) { JType singularReaderClass = g.getModel()._ref(TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(), inputVariable.getMajorType().getMode())); JType fieldReadClass = g.getModel()._ref(FieldReader.class); sub.decl(fieldReadClass, parameter.getName(), JExpr._new(singularReaderClass).arg(inputVariable.getHolder())); - } else { + } else if (!parameter.isFieldReader() && inputVariable.isReader() && Types.isComplex(parameter.getType())) { + // For complex data-types (repeated maps/lists) the input to the aggregate will be a FieldReader. However, aggregate + // functions like ANY_VALUE, will assume the input to be a RepeatedMapHolder etc. Generate boilerplate code, to map + // from FieldReader to respective Holder. + if (parameter.getType().getMinorType() == MinorType.MAP) { + JType holderClass; + if (parameter.getType().getMode() == TypeProtos.DataMode.REPEATED) { + holderClass = g.getModel()._ref(RepeatedMapHolder.class); + JVar holderVar = sub.decl(holderClass, parameter.getName(), JExpr._new(holderClass)); + sub.assign(holderVar.ref("reader"), inputVariable.getHolder()); + } else { + holderClass = g.getModel()._ref(MapHolder.class); + JVar holderVar = sub.decl(holderClass, parameter.getName(), JExpr._new(holderClass)); + sub.assign(holderVar.ref("reader"), inputVariable.getHolder()); + } + } else if (parameter.getType().getMinorType() == MinorType.LIST) { + //TODO: Add support for REPEATED LISTs + JType holderClass = g.getModel()._ref(ListHolder.class); + JVar holderVar = sub.decl(holderClass, parameter.getName(), JExpr._new(holderClass)); + sub.assign(holderVar.ref("reader"), inputVariable.getHolder()); + } + } + else { sub.decl(inputVariable.getHolder().type(), parameter.getName(), inputVariable.getHolder()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java index ca5605a..b5a2f07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionConverter.java @@ -181,7 +181,9 @@ public class FunctionConverter { switch (template.scope()) { case POINT_AGGREGATE: - return new DrillAggFuncHolder(functionAttributes, initializer); + return outputField.isComplexWriter() ? + new DrillComplexWriterAggFuncHolder(functionAttributes, initializer) : + new DrillAggFuncHolder(functionAttributes, initializer); case SIMPLE: return outputField.isComplexWriter() ? new DrillComplexWriterFuncHolder(functionAttributes, initializer) : diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java index 703d62e..3db9f5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Mappify.java @@ -60,7 +60,7 @@ public class Mappify { } public void eval() { - buffer = org.apache.drill.exec.expr.fn.impl.MappifyUtility.mappify(reader, writer, buffer); + buffer = org.apache.drill.exec.expr.fn.impl.MappifyUtility.mappify(reader, writer, buffer, "Mappify/kvgen"); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java index 3745fe2..b3fca2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java @@ -37,7 +37,7 @@ public class MappifyUtility { public static final String fieldKey = "key"; public static final String fieldValue = "value"; - public static DrillBuf mappify(FieldReader reader, BaseWriter.ComplexWriter writer, DrillBuf buffer) { + public static DrillBuf mappify(FieldReader reader, BaseWriter.ComplexWriter writer, DrillBuf buffer, String caller) { // Currently we expect single map as input if (DataMode.REPEATED == reader.getType().getMode() || !(reader.getType().getMinorType() == TypeProtos.MinorType.MAP)) { throw new DrillRuntimeException("kvgen function only supports Simple maps as input"); @@ -72,7 +72,7 @@ public class MappifyUtility { mapWriter.varChar(fieldKey).write(vh); // Write the value to the map - MapUtility.writeToMapFromReader(fieldReader, mapWriter); + MapUtility.writeToMapFromReader(fieldReader, mapWriter, caller); mapWriter.end(); } @@ -80,5 +80,35 @@ public class MappifyUtility { return buffer; } + + public static void createRepeatedMapOrList(FieldReader reader, BaseWriter.ComplexWriter writer, String caller) { + if (DataMode.REPEATED != reader.getType().getMode()) { + throw new DrillRuntimeException("Do not invoke createRepeatedMapOrList() unless MINOR mode is REPEATED"); + } + BaseWriter.ListWriter listWriter = writer.rootAsList(); + MapUtility.writeToListFromReader(reader, listWriter, caller); + } + + public static void createMap(FieldReader reader, BaseWriter.ComplexWriter writer, String caller) { + if (DataMode.REPEATED == reader.getType().getMode()) { + throw new DrillRuntimeException("Do not invoke createMap() with REPEATED MINOR mode"); + } + if (reader.getType().getMinorType() == TypeProtos.MinorType.MAP) { + BaseWriter.MapWriter mapWriter = writer.rootAsMap(); + // Iterate over the fields in the map + Iterator<String> fieldIterator = reader.iterator(); + while (fieldIterator.hasNext()) { + String field = fieldIterator.next(); + FieldReader fieldReader = reader.reader(field); + // Write the value to the map + MapUtility.writeToMapFromReader(fieldReader, mapWriter, field, caller); + } + } + } + + public static void createList(FieldReader reader, BaseWriter.ComplexWriter writer, String caller) { + BaseWriter.ListWriter listWriter = writer.rootAsList(); + MapUtility.writeToListFromReader(reader, listWriter, caller); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 34ab97e..caeed50 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -18,7 +18,9 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; +import java.util.List; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; @@ -35,6 +37,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.DrillFuncHolderExpr; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.TypeHelper; @@ -50,21 +53,26 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; +import org.apache.drill.exec.vector.UntypedNullHolder; +import org.apache.drill.exec.vector.UntypedNullVector; import org.apache.drill.exec.vector.ValueVector; import com.sun.codemodel.JExpr; import com.sun.codemodel.JVar; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class); private StreamingAggregator aggregator; private final RecordBatch incoming; + private List<BaseWriter.ComplexWriter> complexWriters; private boolean done = false; private boolean first = true; private int recordCount = 0; @@ -107,6 +115,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } @Override + public VectorContainer getOutgoingContainer() { + return this.container; + } + + @Override public void buildSchema() throws SchemaChangeException { IterOutcome outcome = next(incoming); switch (outcome) { @@ -131,6 +144,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { for (final VectorWrapper<?> w : container) { w.getValueVector().allocateNew(); } + + if (complexWriters != null) { + container.buildSchema(SelectionVectorMode.NONE); + } } @Override @@ -177,7 +194,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { throw new IllegalStateException(String.format("unknown outcome %s", outcome)); } } - AggOutcome out = aggregator.doWork(); recordCount = aggregator.getOutputCount(); logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); @@ -191,6 +207,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { // fall through case RETURN_OUTCOME: IterOutcome outcome = aggregator.getOutcome(); + // In case of complex writer expression, vectors would be added to batch run-time. + // We have to re-build the schema. + if (complexWriters != null) { + container.buildSchema(SelectionVectorMode.NONE); + } if (outcome == IterOutcome.NONE && first) { first = false; done = true; @@ -213,6 +234,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } + private void allocateComplexWriters() { + // Allocate the complex writers before processing the incoming batch + if (complexWriters != null) { + for (final BaseWriter.ComplexWriter writer : complexWriters) { + writer.allocate(); + } + } + } /** * Method is invoked when we have a straight aggregate (no group by expression) and our input is empty. @@ -272,9 +301,15 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } + public void addComplexWriter(final BaseWriter.ComplexWriter writer) { + complexWriters.add(writer); + } + private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); + // Uncomment out this line to debug the generated code. + //cg.getCodeGenerator().saveCodeForDebugging(true); container.clear(); LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()]; @@ -307,12 +342,29 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { continue; } - final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(), - expr.getMajorType()); - @SuppressWarnings("resource") - ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); - TypedFieldId id = container.add(vector); - valueExprs[i] = new ValueVectorWriteExpression(id, expr, true); + /* Populate the complex writers for complex exprs */ + if (expr instanceof DrillFuncHolderExpr && + ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) { + // Need to process ComplexWriter function evaluation. + // Lazy initialization of the list of complex writers, if not done yet. + if (complexWriters == null) { + complexWriters = Lists.newArrayList(); + } else { + complexWriters.clear(); + } + // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. + ((DrillFuncHolderExpr) expr).getFieldReference(ne.getRef()); + MaterializedField field = MaterializedField.create(ne.getRef().getAsNamePart().getName(), UntypedNullHolder.TYPE); + container.add(new UntypedNullVector(field, container.getAllocator())); + valueExprs[i] = expr; + } else { + final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(), + expr.getMajorType()); + @SuppressWarnings("resource") + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + TypedFieldId id = container.add(vector); + valueExprs[i] = new ValueVectorWriteExpression(id, expr, true); + } } if (collector.hasErrors()) { @@ -331,6 +383,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { container.buildSchema(SelectionVectorMode.NONE); StreamingAggregator agg = context.getImplementationClass(cg); agg.setup(oContext, incoming, this); + allocateComplexWriters(); return agg; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java index 02dd4de..19499d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.planner.physical; import com.google.common.collect.Lists; +import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.planner.logical.DrillAggregateRel; import org.apache.drill.exec.planner.logical.RelOptHelper; @@ -58,7 +59,8 @@ public class HashAggPrule extends AggPruleBase { final DrillAggregateRel aggregate = call.rel(0); final RelNode input = call.rel(1); - if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) { + if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0 + || requiresStreamingAgg(aggregate)) { // currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or // if there are no grouping keys return; @@ -101,6 +103,16 @@ public class HashAggPrule extends AggPruleBase { } } + private boolean requiresStreamingAgg(DrillAggregateRel aggregate) { + //If contains ANY_VALUE aggregate, using HashAgg would not work + for (AggregateCall agg : aggregate.getAggCallList()) { + if (agg.getAggregation().getName().equalsIgnoreCase("any_value")) { + return true; + } + } + return false; + } + private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, InvalidRelException> { final RelTrait distOnAllKeys; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java index 543a6db..f4d29e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java @@ -28,14 +28,14 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.writer.BaseWriter; public class MapUtility { - private final static String TYPE_MISMATCH_ERROR = "Mappify/kvgen does not support heterogeneous value types. All values in the input map must be of the same type. The field [%s] has a differing type [%s]."; + private final static String TYPE_MISMATCH_ERROR = " does not support heterogeneous value types. All values in the input map must be of the same type. The field [%s] has a differing type [%s]."; /* * Function to read a value from the field reader, detect the type, construct the appropriate value holder * and use the value holder to write to the Map. */ // TODO : This should be templatized and generated using freemarker - public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapWriter mapWriter) { + public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapWriter mapWriter, String caller) { try { MajorType valueMajorType = fieldReader.getType(); MinorType valueMinorType = valueMajorType.getMinorType(); @@ -228,11 +228,311 @@ public class MapUtility { fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).list()); break; default: - throw new DrillRuntimeException(String.format("kvgen does not support input of type: %s", valueMinorType)); + throw new DrillRuntimeException(String.format(caller + + " does not support input of type: %s", valueMinorType)); } } catch (ClassCastException e) { final MaterializedField field = fieldReader.getField(); - throw new DrillRuntimeException(String.format(TYPE_MISMATCH_ERROR, field.getName(), field.getType())); + throw new DrillRuntimeException(String.format(caller + TYPE_MISMATCH_ERROR, field.getName(), field.getType())); + } + } + + public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapWriter mapWriter, + String fieldName, String caller) { + try { + MajorType valueMajorType = fieldReader.getType(); + MinorType valueMinorType = valueMajorType.getMinorType(); + boolean repeated = false; + + if (valueMajorType.getMode() == TypeProtos.DataMode.REPEATED) { + repeated = true; + } + + switch (valueMinorType) { + case TINYINT: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).tinyInt()); + } else { + fieldReader.copyAsValue(mapWriter.tinyInt(fieldName)); + } + break; + case SMALLINT: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).smallInt()); + } else { + fieldReader.copyAsValue(mapWriter.smallInt(fieldName)); + } + break; + case BIGINT: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).bigInt()); + } else { + fieldReader.copyAsValue(mapWriter.bigInt(fieldName)); + } + break; + case INT: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).integer()); + } else { + fieldReader.copyAsValue(mapWriter.integer(fieldName)); + } + break; + case UINT1: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).uInt1()); + } else { + fieldReader.copyAsValue(mapWriter.uInt1(fieldName)); + } + break; + case UINT2: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).uInt2()); + } else { + fieldReader.copyAsValue(mapWriter.uInt2(fieldName)); + } + break; + case UINT4: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).uInt4()); + } else { + fieldReader.copyAsValue(mapWriter.uInt4(fieldName)); + } + break; + case UINT8: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).uInt8()); + } else { + fieldReader.copyAsValue(mapWriter.uInt8(fieldName)); + } + break; + case DECIMAL9: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).decimal9()); + } else { + fieldReader.copyAsValue(mapWriter.decimal9(fieldName)); + } + break; + case DECIMAL18: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).decimal18()); + } else { + fieldReader.copyAsValue(mapWriter.decimal18(fieldName)); + } + break; + case DECIMAL28SPARSE: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).decimal28Sparse()); + } else { + fieldReader.copyAsValue(mapWriter.decimal28Sparse(fieldName)); + } + break; + case DECIMAL38SPARSE: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).decimal38Sparse()); + } else { + fieldReader.copyAsValue(mapWriter.decimal38Sparse(fieldName)); + } + break; + case VARDECIMAL: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).varDecimal(valueMajorType.getScale(), valueMajorType.getPrecision())); + } else { + fieldReader.copyAsValue(mapWriter.varDecimal(fieldName, valueMajorType.getScale(), valueMajorType.getPrecision())); + } + break; + case DATE: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).date()); + } else { + fieldReader.copyAsValue(mapWriter.date(fieldName)); + } + break; + case TIME: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).time()); + } else { + fieldReader.copyAsValue(mapWriter.time(fieldName)); + } + break; + case TIMESTAMP: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).timeStamp()); + } else { + fieldReader.copyAsValue(mapWriter.timeStamp(fieldName)); + } + break; + case INTERVAL: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).interval()); + } else { + fieldReader.copyAsValue(mapWriter.interval(fieldName)); + } + break; + case INTERVALDAY: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).intervalDay()); + } else { + fieldReader.copyAsValue(mapWriter.intervalDay(fieldName)); + } + break; + case INTERVALYEAR: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).intervalYear()); + } else { + fieldReader.copyAsValue(mapWriter.intervalYear(fieldName)); + } + break; + case FLOAT4: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).float4()); + } else { + fieldReader.copyAsValue(mapWriter.float4(fieldName)); + } + break; + case FLOAT8: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).float8()); + } else { + fieldReader.copyAsValue(mapWriter.float8(fieldName)); + } + break; + case BIT: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).bit()); + } else { + fieldReader.copyAsValue(mapWriter.bit(fieldName)); + } + break; + case VARCHAR: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).varChar()); + } else { + fieldReader.copyAsValue(mapWriter.varChar(fieldName)); + } + break; + case VARBINARY: + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(fieldName).varBinary()); + } else { + fieldReader.copyAsValue(mapWriter.varBinary(fieldName)); + } + break; + case MAP: + if (valueMajorType.getMode() == TypeProtos.DataMode.REPEATED) { + fieldReader.copyAsValue(mapWriter.list(fieldName).map()); + } else { + fieldReader.copyAsValue(mapWriter.map(fieldName)); + } + break; + case LIST: + fieldReader.copyAsValue(mapWriter.list(fieldName).list()); + break; + default: + throw new DrillRuntimeException(String.format(caller + + " does not support input of type: %s", valueMinorType)); + } + } catch (ClassCastException e) { + final MaterializedField field = fieldReader.getField(); + throw new DrillRuntimeException(String.format(caller + TYPE_MISMATCH_ERROR, field.getName(), field.getType())); + } + } + + public static void writeToListFromReader(FieldReader fieldReader, BaseWriter.ListWriter listWriter, String caller) { + try { + MajorType valueMajorType = fieldReader.getType(); + MinorType valueMinorType = valueMajorType.getMinorType(); + boolean repeated = false; + + if (valueMajorType.getMode() == TypeProtos.DataMode.REPEATED) { + repeated = true; + } + + switch (valueMinorType) { + case TINYINT: + fieldReader.copyAsValue(listWriter.tinyInt()); + break; + case SMALLINT: + fieldReader.copyAsValue(listWriter.smallInt()); + break; + case BIGINT: + fieldReader.copyAsValue(listWriter.bigInt()); + break; + case INT: + fieldReader.copyAsValue(listWriter.integer()); + break; + case UINT1: + fieldReader.copyAsValue(listWriter.uInt1()); + break; + case UINT2: + fieldReader.copyAsValue(listWriter.uInt2()); + break; + case UINT4: + fieldReader.copyAsValue(listWriter.uInt4()); + break; + case UINT8: + fieldReader.copyAsValue(listWriter.uInt8()); + break; + case DECIMAL9: + fieldReader.copyAsValue(listWriter.decimal9()); + break; + case DECIMAL18: + fieldReader.copyAsValue(listWriter.decimal18()); + break; + case DECIMAL28SPARSE: + fieldReader.copyAsValue(listWriter.decimal28Sparse()); + break; + case DECIMAL38SPARSE: + fieldReader.copyAsValue(listWriter.decimal38Sparse()); + break; + case VARDECIMAL: + fieldReader.copyAsValue(listWriter.varDecimal(valueMajorType.getScale(), valueMajorType.getPrecision())); + break; + case DATE: + fieldReader.copyAsValue(listWriter.date()); + break; + case TIME: + fieldReader.copyAsValue(listWriter.time()); + break; + case TIMESTAMP: + fieldReader.copyAsValue(listWriter.timeStamp()); + break; + case INTERVAL: + fieldReader.copyAsValue(listWriter.interval()); + break; + case INTERVALDAY: + fieldReader.copyAsValue(listWriter.intervalDay()); + break; + case INTERVALYEAR: + fieldReader.copyAsValue(listWriter.intervalYear()); + break; + case FLOAT4: + fieldReader.copyAsValue(listWriter.float4()); + break; + case FLOAT8: + fieldReader.copyAsValue(listWriter.float8()); + break; + case BIT: + fieldReader.copyAsValue(listWriter.bit()); + break; + case VARCHAR: + fieldReader.copyAsValue(listWriter.varChar()); + break; + case VARBINARY: + fieldReader.copyAsValue(listWriter.varBinary()); + break; + case MAP: + fieldReader.copyAsValue(listWriter.map()); + break; + case LIST: + fieldReader.copyAsValue(listWriter.list()); + break; + default: + throw new DrillRuntimeException(String.format(caller + + " function does not support input of type: %s", valueMinorType)); + } + } catch (ClassCastException e) { + final MaterializedField field = fieldReader.getField(); + throw new DrillRuntimeException(String.format(caller + TYPE_MISMATCH_ERROR, field.getName(), field.getType())); } } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java new file mode 100644 index 0000000..37c0b52 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.agg; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase; +import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.test.BaseTestQuery; +import org.apache.drill.categories.OperatorTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.drill.test.TestBuilder; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; + +import java.math.BigDecimal; +import java.util.List; + +@Category(OperatorTest.class) +@RunWith(Enclosed.class) +public class TestAggWithAnyValue { + + public static class TestAggWithAnyValueMultipleBatches extends PhysicalOpUnitTestBase { + + @Test + public void testStreamAggWithGroupBy() { + StreamingAggregate aggConf = new StreamingAggregate(null, parseExprs("age.`max`", "age"), parseExprs("any_value(a)", "any_a"), 2.0f); + List<String> inputJsonBatches = Lists.newArrayList( + "[{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"San Bruno\", \"de\": \"987654321987654321987654321.10987654321\"," + + " \"a\": [{\"b\":50, \"c\":30},{\"b\":70, \"c\":40}], \"m\": [{\"n\": [10, 11, 12]}], \"f\": [{\"g\": {\"h\": [{\"k\": 70}, {\"k\": 80}]}}]," + + "\"p\": {\"q\": [21, 22, 23]}" + "}, " + + "{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"Castro Valley\", \"de\": \"987654321987654321987654321.12987654321\"," + + " \"a\": [{\"b\":60, \"c\":40},{\"b\":80, \"c\":50}], \"m\": [{\"n\": [13, 14, 15]}], \"f\": [{\"g\": {\"h\": [{\"k\": 90}, {\"k\": 100}]}}]," + + "\"p\": {\"q\": [24, 25, 26]}" + "}]", + "[{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.00987654321\"," + + " \"a\": [{\"b\":10, \"c\":15}, {\"b\":20, \"c\":45}], \"m\": [{\"n\": [1, 2, 3]}], \"f\": [{\"g\": {\"h\": [{\"k\": 10}, {\"k\": 20}]}}]," + + "\"p\": {\"q\": [27, 28, 29]}" + "}, " + + "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"San Carlos\", \"de\": \"987654321987654321987654321.11987654321\"," + + " \"a\": [{\"b\":30, \"c\":25}, {\"b\":40, \"c\":55}], \"m\": [{\"n\": [4, 5, 6]}], \"f\": [{\"g\": {\"h\": [{\"k\": 30}, {\"k\": 40}]}}]," + + "\"p\": {\"q\": [30, 31, 32]}" + "}, " + + "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.13987654321\"," + + " \"a\": [{\"b\":70, \"c\":85}, {\"b\":90, \"c\":145}], \"m\": [{\"n\": [7, 8, 9]}], \"f\": [{\"g\": {\"h\": [{\"k\": 50}, {\"k\": 60}]}}]," + + "\"p\": {\"q\": [33, 34, 35]}" + "}]"); + opTestBuilder() + .physicalOperator(aggConf) + .inputDataStreamJson(inputJsonBatches) + .baselineColumns("age", "any_a") + .baselineValues(60l, TestBuilder.listOf(TestBuilder.mapOf("b", 50l, "c", 30l), TestBuilder.mapOf("b", 70l, "c", 40l))) + .baselineValues(80l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l))) + .go(); + } + } + + public static class TestAggWithAnyValueSingleBatch extends BaseTestQuery { + + @Test + public void testWithGroupBy() throws Exception { + String query = "select t1.age.`max` as age, count(*) as cnt, any_value(t1.a) as any_a, any_value(t1.city) as any_city, " + + "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`"; + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("age", "cnt", "any_a", "any_city", "any_f", "any_m", "any_p") + .baselineValues(60l, 2l, TestBuilder.listOf(TestBuilder.mapOf("b", 50l, "c", 30l), TestBuilder.mapOf("b", 70l, "c", 40l)), "San Bruno", + TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 70l), TestBuilder.mapOf("k", 80l))))), + TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(10l, 11l, 12l))), + TestBuilder.mapOf("q", TestBuilder.listOf(21l, 22l, 23l))) + .baselineValues(80l, 3l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)), "Palo Alto", + TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 10l), TestBuilder.mapOf("k", 20l))))), + TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(1l, 2l, 3l))), + TestBuilder.mapOf("q", TestBuilder.listOf(27l, 28l, 29l))) + .go(); + } + + @Test + public void testWithoutGroupBy() throws Exception { + String query = "select count(*) as cnt, any_value(t1.a) as any_a, any_value(t1.city) as any_city, " + + "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p from cp.`store/json/test_anyvalue.json` t1"; + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("cnt", "any_a", "any_city", "any_f", "any_m", "any_p") + .baselineValues(5l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)), "Palo Alto", + TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 10l), TestBuilder.mapOf("k", 20l))))), + TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(1l, 2l, 3l))), + TestBuilder.mapOf("q", TestBuilder.listOf(27l, 28l, 29l))) + .go(); + } + + @Test + public void testDecimalWithGroupBy() throws Exception { + String query = "select t1.age.`max` as age, any_value(cast(t1.de as decimal(38, 11))) as any_decimal " + + "from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`"; + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("age", "any_decimal") + .baselineValues(60l, new BigDecimal("987654321987654321987654321.10987654321")) + .baselineValues(80l, new BigDecimal("987654321987654321987654321.00987654321")) + .go(); + } + + @Test + public void testRepeatedDecimalWithGroupBy() throws Exception { + JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>(); + ints.add(new BigDecimal("999999.999")); + ints.add(new BigDecimal("-999999.999")); + ints.add(new BigDecimal("0.000")); + + JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>(); + longs.add(new BigDecimal("999999999.999999999")); + longs.add(new BigDecimal("-999999999.999999999")); + longs.add(new BigDecimal("0.000000000")); + + JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>(); + fixedLen.add(new BigDecimal("999999999999.999999")); + fixedLen.add(new BigDecimal("-999999999999.999999")); + fixedLen.add(new BigDecimal("0.000000")); + + String query = "select any_value(decimal_int32) as any_dec_32, any_value(decimal_int64) as any_dec_64," + + " any_value(decimal_fixedLen) as any_dec_fixed, any_value(decimal_binary) as any_dec_bin" + + " from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`"; + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("any_dec_32", "any_dec_64", "any_dec_fixed", "any_dec_bin") + .baselineValues(ints, longs, fixedLen, fixedLen) + .go(); + } + } +} \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/store/json/test_anyvalue.json b/exec/java-exec/src/test/resources/store/json/test_anyvalue.json new file mode 100644 index 0000000..8e7bef9 --- /dev/null +++ b/exec/java-exec/src/test/resources/store/json/test_anyvalue.json @@ -0,0 +1,50 @@ +{ + "age": {"min":43, "max":80}, + "city": "Palo Alto", + "de": "987654321987654321987654321.00987654321", + "delist": ["987654321987654321987654321.19987654321", "987654321987654321987654321.20987654321"], + "a": [{"b":10, "c":15}, {"b":20, "c":45}], + "m": [{"n": [1, 2, 3]}], + "f": [{"g": {"h": [{"k": 10}, {"k": 20}]}}], + "p": {"q" : [27, 28, 29]} +} +{ + "age": {"min":20, "max":60}, + "city": "San Bruno", + "de": "987654321987654321987654321.10987654321", + "delist": ["987654321987654321987654321.17987654321", "987654321987654321987654321.18987654321"], + "a": [{"b":50, "c":30},{"b":70, "c":40}], + "m": [{"n": [10, 11, 12]}], + "f": [{"g": {"h": [{"k": 70}, {"k": 80}]}}], + "p": {"q" : [21, 22, 23]} +} +{ + "age": {"min":43, "max":80}, + "city": "San Carlos", + "de": "987654321987654321987654321.11987654321", + "delist": ["987654321987654321987654321.11987654321", "987654321987654321987654321.12987654321"], + "a": [{"b":30, "c":25}, {"b":40, "c":55}], + "m": [{"n": [4, 5, 6]}], + "f": [{"g": {"h": [{"k": 30}, {"k": 40}]}}], + "p": {"q" : [30, 31, 32]} +} +{ + "age": {"min":20, "max":60}, + "city": "Castro Valley", + "de": "987654321987654321987654321.12987654321", + "delist": ["987654321987654321987654321.13987654321", "987654321987654321987654321.14987654321"], + "a": [{"b":60, "c":40},{"b":80, "c":50}], + "m": [{"n": [13, 14, 15]}], + "f": [{"g": {"h": [{"k": 90}, {"k": 100}]}}], + "p": {"q" : [24, 25, 26]} +} +{ + "age": {"min":43, "max":80}, + "city": "Palo Alto", + "de": "987654321987654321987654321.13987654321", + "delist": ["987654321987654321987654321.15987654321", "987654321987654321987654321.16987654321"], + "a": [{"b":70, "c":85}, {"b":90, "c":145}], + "m": [{"n": [7, 8, 9]}], + "f": [{"g": {"h": [{"k": 50}, {"k": 60}]}}], + "p": {"q" : [33, 34, 35]} +} \ No newline at end of file diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java index 4e6edb5..037332f 100644 --- a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java @@ -307,6 +307,7 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector holder.start = offsets.getAccessor().get(index); holder.end = offsets.getAccessor().get(index+1); holder.vector = values; + holder.reader = reader; } public void get(int index, int positionIndex, ${minor.class}Holder holder) { diff --git a/exec/vector/src/main/codegen/templates/ValueHolders.java b/exec/vector/src/main/codegen/templates/ValueHolders.java index 7635895..8562d1b 100644 --- a/exec/vector/src/main/codegen/templates/ValueHolders.java +++ b/exec/vector/src/main/codegen/templates/ValueHolders.java @@ -49,16 +49,17 @@ public final class ${className} implements ValueHolder { /** The first index (inclusive) into the Vector. **/ public int start; - + /** The last index (exclusive) into the Vector. **/ public int end; - + /** The Vector holding the actual values. **/ public ${minor.class}Vector vector; - - <#else> + + public FieldReader reader; +<#else> public static final int WIDTH = ${type.width}; - + <#if mode.name == "Optional">public int isSet;</#if> <#assign fields = minor.fields!type.fields /> <#list fields as field> diff --git a/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java b/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java index 52f590a..ce7e34d 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.expr.holders; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.vector.complex.ListVector; +import org.apache.drill.exec.vector.complex.reader.FieldReader; public final class RepeatedListHolder implements ValueHolder{ @@ -36,4 +37,5 @@ public final class RepeatedListHolder implements ValueHolder{ /** The Vector holding the actual values. **/ public ListVector vector; + public FieldReader reader; } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java b/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java index f8acaeb..516d135 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.expr.holders; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.reader.FieldReader; public final class RepeatedMapHolder implements ValueHolder{ @@ -38,4 +39,6 @@ public final class RepeatedMapHolder implements ValueHolder{ /** The Vector holding the actual values. **/ public MapVector vector; + public FieldReader reader; + } diff --git a/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprLexer.g b/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprLexer.g index 2b497a1..93dba94 100644 --- a/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprLexer.g +++ b/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprLexer.g @@ -37,6 +37,7 @@ When : 'when'; Cast: 'cast'; Convert : 'convert_' ('from' | 'to'); +AnyValue : 'any_value' | 'ANY_VALUE'; Nullable: 'nullable'; Repeat: 'repeat'; As: 'as'; diff --git a/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g b/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g index e73bdea..78a7cc3 100644 --- a/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g +++ b/logical/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g @@ -81,6 +81,10 @@ convertCall returns [LogicalExpression e] { $e = FunctionCallFactory.createConvert($Convert.text, $String.text, $expression.e, pos($Convert));} ; +anyValueCall returns [LogicalExpression e] + : AnyValue OParen exprList? CParen {$e = FunctionCallFactory.createExpression($AnyValue.text, pos($AnyValue), $exprList.listE); } + ; + castCall returns [LogicalExpression e] @init{ List<LogicalExpression> exprs = new ArrayList<LogicalExpression>(); @@ -313,6 +317,7 @@ arraySegment returns [PathSegment seg] lookup returns [LogicalExpression e] : functionCall {$e = $functionCall.e ;} | convertCall {$e = $convertCall.e; } + | anyValueCall {$e = $anyValueCall.e; } | castCall {$e = $castCall.e; } | pathSegment {$e = new SchemaPath($pathSegment.seg, pos($pathSegment.start) ); } | String {$e = new ValueExpressions.QuotedString($String.text, $String.text.length(), pos($String) ); } diff --git a/logical/src/main/java/org/apache/drill/common/expression/AnyValueExpression.java b/logical/src/main/java/org/apache/drill/common/expression/AnyValueExpression.java new file mode 100644 index 0000000..4dff147 --- /dev/null +++ b/logical/src/main/java/org/apache/drill/common/expression/AnyValueExpression.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.expression; + +import org.apache.drill.common.expression.visitors.ExprVisitor; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.Types; + +import java.util.Collections; +import java.util.Iterator; + +public class AnyValueExpression extends LogicalExpressionBase implements Iterable<LogicalExpression>{ + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AnyValueExpression.class); + + public static final String ANY_VALUE = "any_value"; + private final LogicalExpression input; + private final MajorType type; + + /** + * @param input + * @param pos + */ + public AnyValueExpression(LogicalExpression input, ExpressionPosition pos) { + super(pos); + this.input = input; + this.type = input.getMajorType(); + } + + @Override + public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E { + return visitor.visitAnyValueExpression(this, value); + } + + @Override + public Iterator<LogicalExpression> iterator() { + return Collections.singleton(input).iterator(); + } + + public LogicalExpression getInput() { + return input; + } + + @Override + public MajorType getMajorType() { + return type; + } + + @Override + public String toString() { + return "AnyValueExpression [input=" + input + ", type=" + Types.toString(type) + "]"; + } +} + diff --git a/logical/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java b/logical/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java index f09f887..fb5323b 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java +++ b/logical/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java @@ -252,6 +252,14 @@ public class ExpressionStringBuilder extends AbstractExprVisitor<Void, StringBui } @Override + public Void visitAnyValueExpression(AnyValueExpression e, StringBuilder sb) throws RuntimeException { + sb.append("any("); + e.getInput().accept(this, sb); + sb.append(")"); + return null; + } + + @Override public Void visitCastExpression(CastExpression e, StringBuilder sb) throws RuntimeException { MajorType mt = e.getMajorType(); diff --git a/logical/src/main/java/org/apache/drill/common/expression/FunctionCallFactory.java b/logical/src/main/java/org/apache/drill/common/expression/FunctionCallFactory.java index 7d9f9a1..513da4c 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/FunctionCallFactory.java +++ b/logical/src/main/java/org/apache/drill/common/expression/FunctionCallFactory.java @@ -85,6 +85,14 @@ public class FunctionCallFactory { return new ConvertExpression(function, conversionType, expr, ep); } + public static LogicalExpression createAnyValue(ExpressionPosition ep, LogicalExpression expr) { + return new AnyValueExpression(expr, ep); + } + + public static LogicalExpression createAnyValue(String functionName, List<LogicalExpression> args) { + return createExpression(functionName, args); + } + public static LogicalExpression createExpression(String functionName, List<LogicalExpression> args){ return createExpression(functionName, ExpressionPosition.UNKNOWN, args); } diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/AbstractExprVisitor.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/AbstractExprVisitor.java index 8458968..18483ce 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/visitors/AbstractExprVisitor.java +++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/AbstractExprVisitor.java @@ -17,6 +17,7 @@ */ package org.apache.drill.common.expression.visitors; +import org.apache.drill.common.expression.AnyValueExpression; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; @@ -165,6 +166,11 @@ public abstract class AbstractExprVisitor<T, VAL, EXCEP extends Exception> imple } @Override + public T visitAnyValueExpression(AnyValueExpression e, VAL value) throws EXCEP { + return visitUnknown(e, value); + } + + @Override public T visitNullConstant(TypedNullConstant e, VAL value) throws EXCEP { return visitUnknown(e, value); } diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/AggregateChecker.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/AggregateChecker.java index 2e6b60b..ac46e42 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/visitors/AggregateChecker.java +++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/AggregateChecker.java @@ -17,6 +17,7 @@ */ package org.apache.drill.common.expression.visitors; +import org.apache.drill.common.expression.AnyValueExpression; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; @@ -177,6 +178,11 @@ public final class AggregateChecker implements ExprVisitor<Boolean, ErrorCollect } @Override + public Boolean visitAnyValueExpression(AnyValueExpression e, ErrorCollector errors) throws RuntimeException { + return e.getInput().accept(this, errors); + } + + @Override public Boolean visitDateConstant(DateExpression intExpr, ErrorCollector errors) { return false; } diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/ConditionalExprOptimizer.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/ConditionalExprOptimizer.java index 05e3a73..1b6eab7 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/visitors/ConditionalExprOptimizer.java +++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/ConditionalExprOptimizer.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import org.apache.drill.common.expression.AnyValueExpression; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; @@ -100,6 +101,12 @@ public class ConditionalExprOptimizer extends AbstractExprVisitor<LogicalExpress + "It should have been converted to FunctionHolderExpression in materialization"); } + @Override + public LogicalExpression visitAnyValueExpression(AnyValueExpression cast, Void value) throws RuntimeException { + throw new UnsupportedOperationException("AnyValueExpression is not expected here. " + + "It should have been converted to FunctionHolderExpression in materialization"); + } + private static Comparator<LogicalExpression> costComparator = new Comparator<LogicalExpression> () { public int compare(LogicalExpression e1, LogicalExpression e2) { return e1.getCumulativeCost() <= e2.getCumulativeCost() ? -1 : 1; diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/ConstantChecker.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/ConstantChecker.java index fbe7d72..a7648ac 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/visitors/ConstantChecker.java +++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/ConstantChecker.java @@ -17,6 +17,7 @@ */ package org.apache.drill.common.expression.visitors; +import org.apache.drill.common.expression.AnyValueExpression; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; @@ -202,6 +203,11 @@ final class ConstantChecker implements ExprVisitor<Boolean, ErrorCollector, Runt } @Override + public Boolean visitAnyValueExpression(AnyValueExpression e, ErrorCollector value) throws RuntimeException { + return e.getInput().accept(this, value); + } + + @Override public Boolean visitNullConstant(TypedNullConstant e, ErrorCollector value) throws RuntimeException { return true; } diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/ExprVisitor.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/ExprVisitor.java index c065bc8..cea83d8 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/visitors/ExprVisitor.java +++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/ExprVisitor.java @@ -17,6 +17,7 @@ */ package org.apache.drill.common.expression.visitors; +import org.apache.drill.common.expression.AnyValueExpression; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; @@ -75,4 +76,5 @@ public interface ExprVisitor<T, VAL, EXCEP extends Exception> { T visitConvertExpression(ConvertExpression e, VAL value) throws EXCEP; T visitParameter(ParameterExpression e, VAL value) throws EXCEP; T visitTypedFieldExpr(TypedFieldExpr e, VAL value) throws EXCEP; + T visitAnyValueExpression(AnyValueExpression e, VAL value) throws EXCEP; } diff --git a/logical/src/main/java/org/apache/drill/common/expression/visitors/ExpressionValidator.java b/logical/src/main/java/org/apache/drill/common/expression/visitors/ExpressionValidator.java index b3074fc..df72a34 100644 --- a/logical/src/main/java/org/apache/drill/common/expression/visitors/ExpressionValidator.java +++ b/logical/src/main/java/org/apache/drill/common/expression/visitors/ExpressionValidator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.common.expression.visitors; +import org.apache.drill.common.expression.AnyValueExpression; import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; @@ -239,6 +240,12 @@ public class ExpressionValidator implements ExprVisitor<Void, ErrorCollector, Ru } @Override + public Void visitAnyValueExpression(AnyValueExpression e, ErrorCollector value) + throws RuntimeException { + return e.getInput().accept(this, value); + } + + @Override public Void visitParameter(ValueExpressions.ParameterExpression e, ErrorCollector value) throws RuntimeException { return null; } -- To stop receiving notification emails like this one, please contact [email protected].
