Repository: hive Updated Branches: refs/heads/master eb4a16448 -> 27172bcb4
HIVE-12889: Support COUNT(DISTINCT) for partitioning query. (Aihua Xu, reviewed by Szehon Ho) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/27172bcb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/27172bcb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/27172bcb Branch: refs/heads/master Commit: 27172bcb4d01efaa9c308ea014baf5ec9ed6208e Parents: eb4a164 Author: Aihua Xu <[email protected]> Authored: Tue Jan 19 11:24:30 2016 -0500 Committer: Aihua Xu <[email protected]> Committed: Tue Jan 26 14:28:29 2016 -0500 ---------------------------------------------------------------------- data/files/windowing_distinct.txt | 6 ++ .../functions/HiveSqlCountAggFunction.java | 10 ++- .../functions/HiveSqlSumAggFunction.java | 9 ++- .../translator/PlanModifierForASTConv.java | 2 +- .../translator/SqlFunctionConverter.java | 51 +++++++++---- .../hadoop/hive/ql/parse/CalcitePlanner.java | 4 +- .../hadoop/hive/ql/parse/IdentifiersParser.g | 2 +- .../hadoop/hive/ql/parse/PTFInvocationSpec.java | 16 +++- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 5 -- .../hadoop/hive/ql/parse/WindowingSpec.java | 22 +++++- .../hive/ql/udf/generic/GenericUDAFCount.java | 44 ++++++++++- .../queries/clientpositive/windowing_distinct.q | 30 ++++++++ .../clientpositive/windowing_distinct.q.out | 78 ++++++++++++++++++++ .../objectinspector/ObjectInspectorUtils.java | 32 ++++++++ 14 files changed, 272 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/data/files/windowing_distinct.txt ---------------------------------------------------------------------- diff --git a/data/files/windowing_distinct.txt b/data/files/windowing_distinct.txt new file mode 100644 index 0000000..9271202 --- /dev/null +++ b/data/files/windowing_distinct.txt @@ -0,0 +1,6 @@ +1|19|442|65553|4294967380|26.43|37.77|true|alice zipper|2013-03-01 09:11:58.703217|29.62|^Ahistory^B +2|124|336|65664|4294967435|74.72|42.47|true|bob davidson|2013-03-01 09:11:58.703302|45.40|^Ayard duty^B +1|19|442|65553|4294967380|26.43|37.77|true|alice zipper|2013-03-01 09:11:58.703217|29.62|^Ahistory^B +1|35|387|65619|4294967459|96.91|18.86|false|katie davidson|2013-03-01 09:11:58.703079|27.32|^Ahistory^B +2|111|372|65656|4294967312|13.01|34.95|false|xavier quirinius|2013-03-01 09:11:58.703310|23.91|^Atopology^B +2|124|336|65664|4294967435|74.72|42.47|true|bob davidson|2013-03-01 09:11:58.703302|45.40|^Ayard duty^B http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java index 7937040..58191e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java @@ -32,11 +32,12 @@ import org.apache.calcite.util.ImmutableIntList; public class HiveSqlCountAggFunction extends SqlAggFunction { + final boolean isDistinct; final SqlReturnTypeInference returnTypeInference; final SqlOperandTypeInference operandTypeInference; final SqlOperandTypeChecker operandTypeChecker; - public HiveSqlCountAggFunction(SqlReturnTypeInference returnTypeInference, + public HiveSqlCountAggFunction(boolean isDistinct, SqlReturnTypeInference returnTypeInference, SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker) { super( "count", @@ -45,11 +46,16 @@ public class HiveSqlCountAggFunction extends SqlAggFunction { operandTypeInference, operandTypeChecker, SqlFunctionCategory.NUMERIC); + this.isDistinct = isDistinct; this.returnTypeInference = returnTypeInference; this.operandTypeChecker = operandTypeChecker; this.operandTypeInference = operandTypeInference; } + public boolean isDistinct() { + return isDistinct; + } + @Override public <T> T unwrap(Class<T> clazz) { if (clazz == SqlSplittableAggFunction.class) { @@ -64,7 +70,7 @@ public class HiveSqlCountAggFunction extends SqlAggFunction { public AggregateCall other(RelDataTypeFactory typeFactory, AggregateCall e) { return AggregateCall.create( - new HiveSqlCountAggFunction(returnTypeInference, operandTypeInference, operandTypeChecker), + new HiveSqlCountAggFunction(isDistinct, returnTypeInference, operandTypeInference, operandTypeChecker), false, ImmutableIntList.of(), -1, typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true), "count"); } http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java index 8f62970..056eaeb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java @@ -47,14 +47,14 @@ import com.google.common.collect.ImmutableList; * is the same type. */ public class HiveSqlSumAggFunction extends SqlAggFunction { - + final boolean isDistinct; final SqlReturnTypeInference returnTypeInference; final SqlOperandTypeInference operandTypeInference; final SqlOperandTypeChecker operandTypeChecker; //~ Constructors ----------------------------------------------------------- - public HiveSqlSumAggFunction(SqlReturnTypeInference returnTypeInference, + public HiveSqlSumAggFunction(boolean isDistinct, SqlReturnTypeInference returnTypeInference, SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker) { super( "sum", @@ -66,6 +66,7 @@ public class HiveSqlSumAggFunction extends SqlAggFunction { this.returnTypeInference = returnTypeInference; this.operandTypeChecker = operandTypeChecker; this.operandTypeInference = operandTypeInference; + this.isDistinct = isDistinct; } //~ Methods ---------------------------------------------------------------- @@ -85,7 +86,7 @@ public class HiveSqlSumAggFunction extends SqlAggFunction { public AggregateCall other(RelDataTypeFactory typeFactory, AggregateCall e) { RelDataType countRetType = typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true); return AggregateCall.create( - new HiveSqlCountAggFunction(ReturnTypes.explicit(countRetType), operandTypeInference, operandTypeChecker), + new HiveSqlCountAggFunction(isDistinct, ReturnTypes.explicit(countRetType), operandTypeInference, operandTypeChecker), false, ImmutableIntList.of(), -1, countRetType, "count"); } @@ -116,7 +117,7 @@ public class HiveSqlSumAggFunction extends SqlAggFunction { throw new AssertionError("unexpected count " + merges); } int ordinal = extra.register(node); - return AggregateCall.create(new HiveSqlSumAggFunction(returnTypeInference, operandTypeInference, operandTypeChecker), + return AggregateCall.create(new HiveSqlSumAggFunction(isDistinct, returnTypeInference, operandTypeInference, operandTypeChecker), false, ImmutableList.of(ordinal), -1, aggregateCall.type, aggregateCall.name); } } http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java index e2fbb4f..1a543fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java @@ -379,7 +379,7 @@ public class PlanModifierForASTConv { RelDataType longType = TypeConverter.convert(TypeInfoFactory.longTypeInfo, typeFactory); RelDataType intType = TypeConverter.convert(TypeInfoFactory.intTypeInfo, typeFactory); // Create the dummy aggregation. - SqlAggFunction countFn = SqlFunctionConverter.getCalciteAggFn("count", + SqlAggFunction countFn = SqlFunctionConverter.getCalciteAggFn("count", false, ImmutableList.of(intType), longType); // TODO: Using 0 might be wrong; might need to walk down to find the // proper index of a dummy. http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java index 37249f9..75c38fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java @@ -217,10 +217,18 @@ public class SqlFunctionConverter { } else if (op.kind == SqlKind.PLUS_PREFIX) { node = (ASTNode) ParseDriver.adaptor.create(HiveParser.PLUS, "PLUS"); } else { - if (op.getName().toUpperCase().equals(SqlStdOperatorTable.COUNT.getName()) - && children.size() == 0) { - node = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTIONSTAR, + // Handle 'COUNT' function for the case of COUNT(*) and COUNT(DISTINCT) + if (op instanceof HiveSqlCountAggFunction) { + if (children.size() == 0) { + node = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTIONSTAR, "TOK_FUNCTIONSTAR"); + } else { + HiveSqlCountAggFunction countFunction = (HiveSqlCountAggFunction)op; + if (countFunction.isDistinct()) { + node = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTIONDI, + "TOK_FUNCTIONDI"); + } + } } node.addChild((ASTNode) ParseDriver.adaptor.create(HiveParser.Identifier, op.getName())); } @@ -416,33 +424,46 @@ public class SqlFunctionConverter { return calciteOp; } - public static SqlAggFunction getCalciteAggFn(String hiveUdfName, + public static SqlAggFunction getCalciteAggFn(String hiveUdfName, boolean isDistinct, ImmutableList<RelDataType> calciteArgTypes, RelDataType calciteRetType) { SqlAggFunction calciteAggFn = (SqlAggFunction) hiveToCalcite.get(hiveUdfName); if (calciteAggFn == null) { - CalciteUDFInfo uInf = getUDFInfo(hiveUdfName, calciteArgTypes, calciteRetType); + CalciteUDFInfo udfInfo = getUDFInfo(hiveUdfName, calciteArgTypes, calciteRetType); switch (hiveUdfName.toLowerCase()) { case "sum": - calciteAggFn = new HiveSqlSumAggFunction(uInf.returnTypeInference, - uInf.operandTypeInference, uInf.operandTypeChecker); + calciteAggFn = new HiveSqlSumAggFunction( + isDistinct, + udfInfo.returnTypeInference, + udfInfo.operandTypeInference, + udfInfo.operandTypeChecker); break; case "count": - calciteAggFn = new HiveSqlCountAggFunction(uInf.returnTypeInference, - uInf.operandTypeInference, uInf.operandTypeChecker); + calciteAggFn = new HiveSqlCountAggFunction( + isDistinct, + udfInfo.returnTypeInference, + udfInfo.operandTypeInference, + udfInfo.operandTypeChecker); break; case "min": - calciteAggFn = new HiveSqlMinMaxAggFunction(uInf.returnTypeInference, - uInf.operandTypeInference, uInf.operandTypeChecker, true); + calciteAggFn = new HiveSqlMinMaxAggFunction( + udfInfo.returnTypeInference, + udfInfo.operandTypeInference, + udfInfo.operandTypeChecker, true); break; case "max": - calciteAggFn = new HiveSqlMinMaxAggFunction(uInf.returnTypeInference, - uInf.operandTypeInference, uInf.operandTypeChecker, false); + calciteAggFn = new HiveSqlMinMaxAggFunction( + udfInfo.returnTypeInference, + udfInfo.operandTypeInference, + udfInfo.operandTypeChecker, false); break; default: - calciteAggFn = new CalciteUDAF(uInf.udfName, uInf.returnTypeInference, - uInf.operandTypeInference, uInf.operandTypeChecker); + calciteAggFn = new CalciteUDAF( + udfInfo.udfName, + udfInfo.returnTypeInference, + udfInfo.operandTypeInference, + udfInfo.operandTypeChecker); break; } http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 3fefbd7..8cc3747 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -1950,7 +1950,7 @@ public class CalcitePlanner extends SemanticAnalyzer { // 3. Get Aggregation FN from Calcite given name, ret type and input arg // type - final SqlAggFunction aggregation = SqlFunctionConverter.getCalciteAggFn(agg.m_udfName, + final SqlAggFunction aggregation = SqlFunctionConverter.getCalciteAggFn(agg.m_udfName, agg.m_distinct, aggArgRelDTBldr.build(), aggFnRetType); return new AggregateCall(aggregation, agg.m_distinct, argList, aggFnRetType, null); @@ -2646,7 +2646,7 @@ public class CalcitePlanner extends SemanticAnalyzer { // 5. Get Calcite Agg Fn final SqlAggFunction calciteAggFn = SqlFunctionConverter.getCalciteAggFn( - hiveAggInfo.m_udfName, calciteAggFnArgsType, calciteAggFnRetType); + hiveAggInfo.m_udfName, hiveAggInfo.m_distinct, calciteAggFnArgsType, calciteAggFnRetType); // 6. Translate Window spec RowResolver inputRR = relToHiveRR.get(srcRel); http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 15ca754..61bd10c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -195,7 +195,7 @@ function RPAREN (KW_OVER ws=window_specification)? -> {$star != null}? ^(TOK_FUNCTIONSTAR functionName $ws?) -> {$dist == null}? ^(TOK_FUNCTION functionName (selectExpression+)? $ws?) - -> ^(TOK_FUNCTIONDI functionName (selectExpression+)?) + -> ^(TOK_FUNCTIONDI functionName (selectExpression+)? $ws?) ; functionName http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java index 29b8510..a8980eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java @@ -411,6 +411,18 @@ public class PTFInvocationSpec { this.expressions = columns; } + /** + * Add order expressions from the list of expressions in the format of ASTNode + * @param args + */ + public void addExpressions(ArrayList<ASTNode> nodes) { + for (int i = 0; i < nodes.size(); i++) { + OrderExpression expr = new OrderExpression(); + expr.setExpression(nodes.get(i)); + addExpression(expr); + } + } + public void addExpression(OrderExpression c) { expressions = expressions == null ? new ArrayList<OrderExpression>() : expressions; @@ -500,7 +512,9 @@ public class PTFInvocationSpec { { Order order; - public OrderExpression() {} + public OrderExpression() { + order = Order.ASC; + } public OrderExpression(PartitionExpression peSpec) { http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 5ff90a6..8c880c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -11640,11 +11640,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { break; } - if ( wfSpec.isDistinct() ) { - throw new SemanticException(generateErrorMessage(node, - "Count/Sum distinct not supported with Windowing")); - } - wfSpec.setExpression(node); ASTNode nameNode = (ASTNode) node.getChild(0); http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java index a181f7c..1bfe8d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java @@ -122,6 +122,9 @@ public class WindowingSpec { WindowFunctionSpec wFn = (WindowFunctionSpec) expr; WindowSpec wdwSpec = wFn.getWindowSpec(); + // 0. Precheck supported syntax + precheckSyntax(wFn, wdwSpec); + // 1. For Wdw Specs that refer to Window Defns, inherit missing components if ( wdwSpec != null ) { ArrayList<String> sources = new ArrayList<String>(); @@ -144,7 +147,15 @@ public class WindowingSpec { validateWindowFrame(wdwSpec); // 5. Add the Partition expressions as the Order if there is no Order and validate Order spec. - setAndValidateOrderSpec(wdwSpec); + setAndValidateOrderSpec(wFn, wdwSpec); + } + } + + private void precheckSyntax(WindowFunctionSpec wFn, WindowSpec wdwSpec) throws SemanticException { + if (wdwSpec != null ) { + if (wFn.isDistinct && (wdwSpec.windowFrame != null || wdwSpec.getOrder() != null) ) { + throw new SemanticException("Function with DISTINCT cannot work with partition ORDER BY or windowing clause."); + } } } @@ -274,8 +285,8 @@ public class WindowingSpec { * @param wdwSpec * @throws SemanticException */ - private void setAndValidateOrderSpec(WindowSpec wdwSpec) throws SemanticException { - wdwSpec.ensureOrderSpec(); + private void setAndValidateOrderSpec(WindowFunctionSpec wFn, WindowSpec wdwSpec) throws SemanticException { + wdwSpec.ensureOrderSpec(wFn); WindowFrameSpec wFrame = wdwSpec.getWindowFrame(); OrderSpec order = wdwSpec.getOrder(); @@ -479,10 +490,13 @@ public class WindowingSpec { * Partition expressions when the OrderSpec is null; but for now we are setting up * an OrderSpec that copies the Partition expressions. */ - protected void ensureOrderSpec() { + protected void ensureOrderSpec(WindowFunctionSpec wFn) throws SemanticException { if ( getOrder() == null ) { OrderSpec order = new OrderSpec(); order.prefixBy(getPartition()); + if (wFn.isDistinct) { + order.addExpressions(wFn.getArgs()); + } setOrder(order); } } http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java index eaf112e..f526c43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java @@ -17,16 +17,21 @@ */ package org.apache.hadoop.hive.ql.udf.generic; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.lazy.LazyString; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; /** * This class implements the COUNT aggregation function as in SQL. @@ -67,8 +72,11 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { assert !paramInfo.isAllColumns() : "* not supported in expression list"; } - return new GenericUDAFCountEvaluator().setCountAllColumns( - paramInfo.isAllColumns()); + GenericUDAFCountEvaluator countEvaluator = new GenericUDAFCountEvaluator(); + countEvaluator.setCountAllColumns(paramInfo.isAllColumns()); + countEvaluator.setCountDistinct(paramInfo.isDistinct()); + + return countEvaluator; } /** @@ -77,7 +85,9 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { */ public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator { private boolean countAllColumns = false; + private boolean countDistinct = false; private LongObjectInspector partialCountAggOI; + private ObjectInspector[] inputOI, outputOI; private LongWritable result; @Override @@ -86,19 +96,27 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { super.init(m, parameters); if (mode == Mode.PARTIAL2 || mode == Mode.FINAL) { partialCountAggOI = (LongObjectInspector)parameters[0]; + } else { + inputOI = parameters; + outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI, + ObjectInspectorCopyOption.JAVA); } result = new LongWritable(0); return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } - private GenericUDAFCountEvaluator setCountAllColumns(boolean countAllCols) { + private void setCountAllColumns(boolean countAllCols) { countAllColumns = countAllCols; - return this; + } + + private void setCountDistinct(boolean countDistinct) { + this.countDistinct = countDistinct; } /** class for storing count value. */ @AggregationType(estimable = true) static class CountAgg extends AbstractAggregationBuffer { + Object[] prevColumns = null; // Column values from previous row. Used to compare with current row for the case of COUNT(DISTINCT). long value; @Override public int estimate() { return JavaDataModel.PRIMITIVES2; } @@ -113,6 +131,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { @Override public void reset(AggregationBuffer agg) throws HiveException { + ((CountAgg) agg).prevColumns = null; ((CountAgg) agg).value = 0; } @@ -134,6 +153,23 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { break; } } + + // Skip the counting if the values are the same for COUNT(DISTINCT) case + if (countThisRow && countDistinct) { + Object[] prevColumns = ((CountAgg) agg).prevColumns; + if (prevColumns == null) { + ((CountAgg) agg).prevColumns = new Object[parameters.length]; + } else if (ObjectInspectorUtils.compare(parameters, inputOI, prevColumns, outputOI) == 0) { + countThisRow = false; + } + + // We need to keep a copy of values from previous row. + if (countThisRow) { + ((CountAgg) agg).prevColumns = ObjectInspectorUtils.copyToStandardObject( + parameters, inputOI, ObjectInspectorCopyOption.JAVA); + } + } + if (countThisRow) { ((CountAgg) agg).value++; } http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/test/queries/clientpositive/windowing_distinct.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/windowing_distinct.q b/ql/src/test/queries/clientpositive/windowing_distinct.q new file mode 100644 index 0000000..94f4044 --- /dev/null +++ b/ql/src/test/queries/clientpositive/windowing_distinct.q @@ -0,0 +1,30 @@ +drop table windowing_distinct; + +create table windowing_distinct( + index int, + t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + dec decimal, + bin binary) + row format delimited + fields terminated by '|'; + +load data local inpath '../../data/files/windowing_distinct.txt' into table windowing_distinct; + + +SELECT COUNT(DISTINCT t) OVER (PARTITION BY index), + COUNT(DISTINCT d) OVER (PARTITION BY index), + COUNT(DISTINCT bo) OVER (PARTITION BY index), + COUNT(DISTINCT s) OVER (PARTITION BY index), + COUNT(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index), + COUNT(DISTINCT ts) OVER (PARTITION BY index), + COUNT(DISTINCT dec) OVER (PARTITION BY index), + COUNT(DISTINCT bin) OVER (PARTITION BY index) +FROM windowing_distinct; http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/test/results/clientpositive/windowing_distinct.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/windowing_distinct.q.out b/ql/src/test/results/clientpositive/windowing_distinct.q.out new file mode 100644 index 0000000..50f8ff8 --- /dev/null +++ b/ql/src/test/results/clientpositive/windowing_distinct.q.out @@ -0,0 +1,78 @@ +PREHOOK: query: drop table windowing_distinct +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table windowing_distinct +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table windowing_distinct( + index int, + t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + dec decimal, + bin binary) + row format delimited + fields terminated by '|' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@windowing_distinct +POSTHOOK: query: create table windowing_distinct( + index int, + t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + dec decimal, + bin binary) + row format delimited + fields terminated by '|' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@windowing_distinct +PREHOOK: query: load data local inpath '../../data/files/windowing_distinct.txt' into table windowing_distinct +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@windowing_distinct +POSTHOOK: query: load data local inpath '../../data/files/windowing_distinct.txt' into table windowing_distinct +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@windowing_distinct +PREHOOK: query: SELECT COUNT(DISTINCT t) OVER (PARTITION BY index), + COUNT(DISTINCT d) OVER (PARTITION BY index), + COUNT(DISTINCT bo) OVER (PARTITION BY index), + COUNT(DISTINCT s) OVER (PARTITION BY index), + COUNT(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index), + COUNT(DISTINCT ts) OVER (PARTITION BY index), + COUNT(DISTINCT dec) OVER (PARTITION BY index), + COUNT(DISTINCT bin) OVER (PARTITION BY index) +FROM windowing_distinct +PREHOOK: type: QUERY +PREHOOK: Input: default@windowing_distinct +#### A masked pattern was here #### +POSTHOOK: query: SELECT COUNT(DISTINCT t) OVER (PARTITION BY index), + COUNT(DISTINCT d) OVER (PARTITION BY index), + COUNT(DISTINCT bo) OVER (PARTITION BY index), + COUNT(DISTINCT s) OVER (PARTITION BY index), + COUNT(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index), + COUNT(DISTINCT ts) OVER (PARTITION BY index), + COUNT(DISTINCT dec) OVER (PARTITION BY index), + COUNT(DISTINCT bin) OVER (PARTITION BY index) +FROM windowing_distinct +POSTHOOK: type: QUERY +POSTHOOK: Input: default@windowing_distinct +#### A masked pattern was here #### +2 2 2 2 2 2 2 1 +2 2 2 2 2 2 2 1 +2 2 2 2 2 2 2 1 +2 2 2 2 2 2 2 2 +2 2 2 2 2 2 2 2 +2 2 2 2 2 2 2 2 http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index 7a13eb0..33e5357 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -145,6 +145,21 @@ public final class ObjectInspectorUtils { } /** + * Get the corresponding standard ObjectInspector array for an array of ObjectInspector. + */ + public static ObjectInspector[] getStandardObjectInspector(ObjectInspector[] ois, + ObjectInspectorCopyOption objectInspectorOption) { + if (ois == null) return null; + + ObjectInspector[] result = new ObjectInspector[ois.length]; + for (int i = 0; i < ois.length; i++) { + result[i] = getStandardObjectInspector(ois[i], objectInspectorOption); + } + + return result; + } + + /** * Get the corresponding standard ObjectInspector for an ObjectInspector. * * The returned ObjectInspector can be used to inspect the standard object. @@ -274,6 +289,23 @@ public final class ObjectInspectorUtils { } /** + * Returns a deep copy of an array of objects + */ + public static Object[] copyToStandardObject( + Object[] o, ObjectInspector[] oi, ObjectInspectorCopyOption objectInspectorOption) { + if (o == null) return null; + assert(o.length == oi.length); + + Object[] result = new Object[o.length]; + for (int i = 0; i < o.length; i++) { + result[i] = ObjectInspectorUtils.copyToStandardObject( + o[i], oi[i], objectInspectorOption); + } + + return result; + } + + /** * Returns a deep copy of the Object o that can be scanned by a * StandardObjectInspector returned by getStandardObjectInspector(oi). */
