Repository: hive Updated Branches: refs/heads/master f6b26b9e4 -> 7c57c05cd
HIVE-15617: Improve the avg performance for Range based window (Aihua Xu, reviewed by Yongzhi Chen) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7c57c05c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7c57c05c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7c57c05c Branch: refs/heads/master Commit: 7c57c05cd6e152fc361131af41ceae0f5ca8c230 Parents: f6b26b9 Author: Aihua Xu <aihu...@apache.org> Authored: Fri Jan 13 11:48:28 2017 -0500 Committer: Aihua Xu <aihu...@apache.org> Committed: Mon Jan 23 09:30:16 2017 -0500 ---------------------------------------------------------------------- .../hive/ql/udf/generic/GenericUDAFAverage.java | 37 ++- .../hive/ql/udf/ptf/BasePartitionEvaluator.java | 259 +++++++++++++++---- 2 files changed, 240 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7c57c05c/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java index 5ad5c06..a28f7e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java @@ -19,18 +19,21 @@ package org.apache.hadoop.hive.ql.udf.generic; import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationType; +import org.apache.hadoop.hive.ql.udf.ptf.BasePartitionEvaluator; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -219,6 +222,19 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { }; } + + @Override + protected BasePartitionEvaluator createPartitionEvaluator( + WindowFrameDef winFrame, + PTFPartition partition, + List<PTFExpressionDef> parameters, + ObjectInspector outputOI) { + try { + return new BasePartitionEvaluator.AvgPartitionDoubleEvaluator(this, winFrame, partition, parameters, inputOI, outputOI); + } catch(HiveException e) { + return super.createPartitionEvaluator(winFrame, partition, parameters, outputOI); + } + } } public static class GenericUDAFAverageEvaluatorDecimal extends AbstractGenericUDAFAverageEvaluator<HiveDecimal> { @@ -358,6 +374,19 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { }; } + + @Override + protected BasePartitionEvaluator createPartitionEvaluator( + WindowFrameDef winFrame, + PTFPartition partition, + List<PTFExpressionDef> parameters, + ObjectInspector outputOI) { + try { + return new BasePartitionEvaluator.AvgPartitionHiveDecimalEvaluator(this, winFrame, partition, parameters, inputOI, outputOI); + } catch(HiveException e) { + return super.createPartitionEvaluator(winFrame, partition, parameters, outputOI); + } + } } @AggregationType(estimable = true) @@ -409,6 +438,8 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { super.init(m, parameters); // init input + partialResult = new Object[2]; + partialResult[0] = new LongWritable(0); if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) { inputOI = (PrimitiveObjectInspector) parameters[0]; copiedOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI, @@ -436,8 +467,6 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { fname.add("count"); fname.add("sum"); fname.add("input"); - partialResult = new Object[2]; - partialResult[0] = new LongWritable(0); // index 1 set by child return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); } else { @@ -445,7 +474,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { } } - protected boolean isWindowingDistinct() { + public boolean isWindowingDistinct() { return isWindowing && avgDistinct; } http://git-wip-us.apache.org/repos/asf/hive/blob/7c57c05c/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java index f5f9f7b..86954ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/BasePartitionEvaluator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.udf.ptf; import java.util.List; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; @@ -30,6 +31,7 @@ import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage.AbstractGenericUDAFAverageEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; @@ -84,6 +86,96 @@ public class BasePartitionEvaluator { } } + /** + * Define some type specific operation to used in the subclass + */ + private static abstract class TypeOperationBase<ResultType> { + public abstract ResultType add(ResultType t1, ResultType t2); + public abstract ResultType minus(ResultType t1, ResultType t2); + public abstract ResultType div(ResultType sum, long numRows); + } + + private static class TypeOperationLongWritable extends TypeOperationBase<LongWritable> { + @Override + public LongWritable add(LongWritable t1, LongWritable t2) { + if (t1 == null && t2 == null) return null; + return new LongWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get())); + } + + @Override + public LongWritable minus(LongWritable t1, LongWritable t2) { + if (t1 == null && t2 == null) return null; + return new LongWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get())); + } + + @Override + public LongWritable div(LongWritable sum, long numRows) { + return null; // Not used + } + } + + private static class TypeOperationDoubleWritable extends TypeOperationBase<DoubleWritable> { + @Override + public DoubleWritable add(DoubleWritable t1, DoubleWritable t2) { + if (t1 == null && t2 == null) return null; + return new DoubleWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get())); + } + + public DoubleWritable minus(DoubleWritable t1, DoubleWritable t2) { + if (t1 == null && t2 == null) return null; + return new DoubleWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get())); + } + + @Override + public DoubleWritable div(DoubleWritable sum, long numRows) { + if (sum == null || numRows == 0) return null; + + return new DoubleWritable(sum.get() / (double)numRows); + } + } + + private static class TypeOperationHiveDecimalWritable extends TypeOperationBase<HiveDecimalWritable> { + @Override + public HiveDecimalWritable div(HiveDecimalWritable sum, long numRows) { + if (sum == null || numRows == 0) return null; + + HiveDecimalWritable result = new HiveDecimalWritable(sum); + result.mutateDivide(HiveDecimal.create(numRows)); + return result; + } + + @Override + public HiveDecimalWritable add(HiveDecimalWritable t1, HiveDecimalWritable t2) { + if (t1 == null && t2 == null) return null; + + if (t1 == null) { + return new HiveDecimalWritable(t2); + } else { + HiveDecimalWritable result = new HiveDecimalWritable(t1); + if (t2 != null) { + result.mutateAdd(t2); + } + return result; + } + } + + @Override + public HiveDecimalWritable minus(HiveDecimalWritable t1, HiveDecimalWritable t2) { + if (t1 == null && t2 == null) return null; + + if (t2 == null) { + return new HiveDecimalWritable(t1); + } else { + HiveDecimalWritable result = new HiveDecimalWritable(t2); + result.mutateNegate(); + if (t1 != null) { + result.mutateAdd(t1); + } + return result; + } + } + } + public BasePartitionEvaluator( GenericUDAFEvaluator wrappedEvaluator, WindowFrameDef winFrame, @@ -217,7 +309,14 @@ public class BasePartitionEvaluator { * */ public static abstract class SumPartitionEvaluator<ResultType extends Writable> extends BasePartitionEvaluator { + static class WindowSumAgg<ResultType> extends AbstractAggregationBuffer { + Range prevRange; + ResultType prevSum; + boolean empty; + } + protected final WindowSumAgg<ResultType> sumAgg; + protected TypeOperationBase<ResultType> typeOperation; public SumPartitionEvaluator( GenericUDAFEvaluator wrappedEvaluator, @@ -229,15 +328,6 @@ public class BasePartitionEvaluator { sumAgg = new WindowSumAgg<ResultType>(); } - static class WindowSumAgg<ResultType> extends AbstractAggregationBuffer { - Range prevRange; - ResultType prevSum; - boolean empty; - } - - public abstract ResultType add(ResultType t1, ResultType t2); - public abstract ResultType minus(ResultType t1, ResultType t2); - @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public Object iterate(int currentRow, LeadLagInfo leadLagInfo) throws HiveException { @@ -262,7 +352,8 @@ public class BasePartitionEvaluator { Range r2 = new Range(sumAgg.prevRange.end, currentRange.end, partition); ResultType sum1 = (ResultType)calcFunctionValue(r1.iterator(), leadLagInfo); ResultType sum2 = (ResultType)calcFunctionValue(r2.iterator(), leadLagInfo); - result = add(minus(sumAgg.prevSum, sum1), sum2); + result = typeOperation.add(typeOperation.minus(sumAgg.prevSum, sum1), sum2); + sumAgg.prevRange = currentRange; sumAgg.prevSum = result; } @@ -276,18 +367,7 @@ public class BasePartitionEvaluator { WindowFrameDef winFrame, PTFPartition partition, List<PTFExpressionDef> parameters, ObjectInspector outputOI) { super(wrappedEvaluator, winFrame, partition, parameters, outputOI); - } - - @Override - public DoubleWritable add(DoubleWritable t1, DoubleWritable t2) { - if (t1 == null && t2 == null) return null; - return new DoubleWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get())); - } - - @Override - public DoubleWritable minus(DoubleWritable t1, DoubleWritable t2) { - if (t1 == null && t2 == null) return null; - return new DoubleWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get())); + this.typeOperation = new TypeOperationDoubleWritable(); } } @@ -296,18 +376,7 @@ public class BasePartitionEvaluator { WindowFrameDef winFrame, PTFPartition partition, List<PTFExpressionDef> parameters, ObjectInspector outputOI) { super(wrappedEvaluator, winFrame, partition, parameters, outputOI); - } - - @Override - public LongWritable add(LongWritable t1, LongWritable t2) { - if (t1 == null && t2 == null) return null; - return new LongWritable((t1 == null ? 0 : t1.get()) + (t2 == null ? 0 : t2.get())); - } - - @Override - public LongWritable minus(LongWritable t1, LongWritable t2) { - if (t1 == null && t2 == null) return null; - return new LongWritable((t1 == null ? 0 : t1.get()) - (t2 == null ? 0 : t2.get())); + this.typeOperation = new TypeOperationLongWritable(); } } @@ -316,33 +385,119 @@ public class BasePartitionEvaluator { WindowFrameDef winFrame, PTFPartition partition, List<PTFExpressionDef> parameters, ObjectInspector outputOI) { super(wrappedEvaluator, winFrame, partition, parameters, outputOI); + this.typeOperation = new TypeOperationHiveDecimalWritable(); } + } - @Override - public HiveDecimalWritable add(HiveDecimalWritable t1, HiveDecimalWritable t2) { - if (t1 == null && t2 == null) return null; - if (t1 == null) { - return t2; - } else { - if (t2 != null) { - t1.mutateAdd(t2); + /** + * The partition evalulator for average function + * @param <ResultType> + */ + public static abstract class AvgPartitionEvaluator<ResultType extends Writable> + extends BasePartitionEvaluator { + static class WindowAvgAgg<ResultType> extends AbstractAggregationBuffer { + Range prevRange; + ResultType prevSum; + long prevCount; + boolean empty; + } + + protected SumPartitionEvaluator<ResultType> sumEvaluator; + protected TypeOperationBase<ResultType> typeOperation; + WindowAvgAgg<ResultType> avgAgg = new WindowAvgAgg<ResultType>(); + + public AvgPartitionEvaluator( + GenericUDAFEvaluator wrappedEvaluator, + WindowFrameDef winFrame, + PTFPartition partition, + List<PTFExpressionDef> parameters, + ObjectInspector outputOI) { + super(wrappedEvaluator, winFrame, partition, parameters, outputOI); + } + + /** + * Calculate the partial result sum + count giving a parition range + * @return a 2-element Object array of [count long, sum ResultType] + */ + private Object[] calcPartialResult(PTFPartitionIterator<Object> pItr, LeadLagInfo leadLagInfo) + throws HiveException { + // To handle the case like SUM(LAG(f)) over(), aggregation function includes + // LAG/LEAD call + PTFOperator.connectLeadLagFunctionsToPartition(leadLagInfo, pItr); + + AggregationBuffer aggBuffer = wrappedEvaluator.getNewAggregationBuffer(); + Object[] argValues = new Object[parameters == null ? 0 : parameters.size()]; + while(pItr.hasNext()) + { + Object row = pItr.next(); + int i = 0; + if ( parameters != null ) { + for(PTFExpressionDef param : parameters) + { + argValues[i++] = param.getExprEvaluator().evaluate(row); + } } - return t1; + wrappedEvaluator.aggregate(aggBuffer, argValues); } + + // The object [count LongWritable, sum ResultType] is reused during evaluating + Object[] partial = (Object[])wrappedEvaluator.terminatePartial(aggBuffer); + return new Object[] {((LongWritable)partial[0]).get(), ObjectInspectorUtils.copyToStandardObject(partial[1], outputOI)}; } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public HiveDecimalWritable minus(HiveDecimalWritable t1, HiveDecimalWritable t2) { - if (t1 == null && t2 == null) return null; - if (t1 == null) { - t2.mutateNegate(); - return t2; + public Object iterate(int currentRow, LeadLagInfo leadLagInfo) throws HiveException { + // // Currently avg(distinct) not supported in PartitionEvaluator + if (((AbstractGenericUDAFAverageEvaluator)wrappedEvaluator).isWindowingDistinct()) { + return super.iterate(currentRow, leadLagInfo); + } + + Range currentRange = getRange(winFrame, currentRow, partition); + if (currentRow == 0 || // Reset for the new partition + avgAgg.prevRange == null || + currentRange.getSize() <= currentRange.getDiff(avgAgg.prevRange)) { + Object[] partial = (Object[])calcPartialResult(currentRange.iterator(), leadLagInfo); + avgAgg.prevRange = currentRange; + avgAgg.empty = false; + avgAgg.prevSum = (ResultType)partial[1]; + avgAgg.prevCount = (long)partial[0]; } else { - if (t2 != null) { - t1.mutateSubtract(t2); - } - return t1; + // Given the previous range and the current range, calculate the new sum + // from the previous sum and the difference to save the computation. + Range r1 = new Range(avgAgg.prevRange.start, currentRange.start, partition); + Range r2 = new Range(avgAgg.prevRange.end, currentRange.end, partition); + Object[] partial1 = (Object[])calcPartialResult(r1.iterator(), leadLagInfo); + Object[] partial2 = (Object[])calcPartialResult(r2.iterator(), leadLagInfo); + ResultType sum = typeOperation.add(typeOperation.minus(avgAgg.prevSum, (ResultType)partial1[1]), (ResultType)partial2[1]); + long count = avgAgg.prevCount - (long)partial1[0]+ (long)partial2[0]; + + avgAgg.prevRange = currentRange; + avgAgg.prevSum = sum; + avgAgg.prevCount = count; } + + return typeOperation.div(avgAgg.prevSum, avgAgg.prevCount); + } + } + + public static class AvgPartitionDoubleEvaluator extends AvgPartitionEvaluator<DoubleWritable> { + + public AvgPartitionDoubleEvaluator(GenericUDAFEvaluator wrappedEvaluator, + WindowFrameDef winFrame, PTFPartition partition, + List<PTFExpressionDef> parameters, ObjectInspector inputOI, ObjectInspector outputOI) throws HiveException { + super(wrappedEvaluator, winFrame, partition, parameters, outputOI); + this.typeOperation = new TypeOperationDoubleWritable(); + } + } + + public static class AvgPartitionHiveDecimalEvaluator extends AvgPartitionEvaluator<HiveDecimalWritable> { + + public AvgPartitionHiveDecimalEvaluator(GenericUDAFEvaluator wrappedEvaluator, + WindowFrameDef winFrame, PTFPartition partition, + List<PTFExpressionDef> parameters, ObjectInspector inputOI, ObjectInspector outputOI) throws HiveException { + super(wrappedEvaluator, winFrame, partition, parameters, outputOI); + this.typeOperation = new TypeOperationHiveDecimalWritable(); } } } \ No newline at end of file