http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java deleted file mode 100644 index 4aac9d3..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java +++ /dev/null @@ -1,521 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.DecimalUtil; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; - -/** - * Generated from template VectorUDAFAvg.txt. - */ -@Description(name = "avg", - value = "_FUNC_(AVG) - Returns the average value of expr (vectorized, type: decimal)") -public class VectorUDAFAvgDecimal extends VectorAggregateExpression { - - private static final long serialVersionUID = 1L; - - /** class for storing the current aggregate value. */ - static class Aggregation implements AggregationBuffer { - - private static final long serialVersionUID = 1L; - - transient private final HiveDecimalWritable sum = new HiveDecimalWritable(); - transient private long count; - transient private boolean isNull; - - public void sumValueWithNullCheck(HiveDecimalWritable writable) { - if (isNull) { - // Make a copy since we intend to mutate sum. - sum.set(writable); - count = 1; - isNull = false; - } else { - // Note that if sum is out of range, mutateAdd will ignore the call. - // At the end, sum.isSet() can be checked for null. - sum.mutateAdd(writable); - count++; - } - } - - public void sumValueNoNullCheck(HiveDecimalWritable writable) { - sum.mutateAdd(writable); - count++; - } - - @Override - public int getVariableSize() { - throw new UnsupportedOperationException(); - } - - @Override - public void reset() { - isNull = true; - sum.setFromLong(0L); - count = 0; - } - } - - private VectorExpression inputExpression; - - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - - transient private Object[] partialResult; - transient private LongWritable resultCount; - transient private HiveDecimalWritable resultSum; - transient private StructObjectInspector soi; - - /** - * The scale of the SUM in the partial output - */ - private short sumScale; - - /** - * The precision of the SUM in the partial output - */ - private short sumPrecision; - - /** - * the scale of the input expression - */ - private short inputScale; - - /** - * the precision of the input expression - */ - private short inputPrecision; - - public VectorUDAFAvgDecimal(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; - } - - public VectorUDAFAvgDecimal() { - super(); - partialResult = new Object[2]; - resultCount = new LongWritable(); - resultSum = new HiveDecimalWritable(); - partialResult[0] = resultCount; - partialResult[1] = resultSum; - - } - - private void initPartialResultInspector() { - // the output type of the vectorized partial aggregate must match the - // expected type for the row-mode aggregation - // For decimal, the type is "same number of integer digits and 4 more decimal digits" - - DecimalTypeInfo dtiSum = GenericUDAFAverage.deriveSumFieldTypeInfo(inputPrecision, inputScale); - this.sumScale = (short) dtiSum.scale(); - this.sumPrecision = (short) dtiSum.precision(); - - List<ObjectInspector> foi = new ArrayList<ObjectInspector>(); - foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(dtiSum)); - List<String> fname = new ArrayList<String>(); - fname.add("count"); - fname.add("sum"); - soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex); - return myagg; - } - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - VectorizedRowBatch batch) throws HiveException { - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - inputExpression.evaluate(batch); - - DecimalColumnVector inputVector = ( DecimalColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - HiveDecimalWritable[] vector = inputVector.vector; - - if (inputVector.noNulls) { - if (inputVector.isRepeating) { - iterateNoNullsRepeatingWithAggregationSelection( - aggregationBufferSets, bufferIndex, - vector[0], batchSize); - } else { - if (batch.selectedInUse) { - iterateNoNullsSelectionWithAggregationSelection( - aggregationBufferSets, bufferIndex, - vector, batch.selected, batchSize); - } else { - iterateNoNullsWithAggregationSelection( - aggregationBufferSets, bufferIndex, - vector, batchSize); - } - } - } else { - if (inputVector.isRepeating) { - if (batch.selectedInUse) { - iterateHasNullsRepeatingSelectionWithAggregationSelection( - aggregationBufferSets, bufferIndex, - vector[0], batchSize, batch.selected, inputVector.isNull); - } else { - iterateHasNullsRepeatingWithAggregationSelection( - aggregationBufferSets, bufferIndex, - vector[0], batchSize, inputVector.isNull); - } - } else { - if (batch.selectedInUse) { - iterateHasNullsSelectionWithAggregationSelection( - aggregationBufferSets, bufferIndex, - vector, batchSize, batch.selected, inputVector.isNull); - } else { - iterateHasNullsWithAggregationSelection( - aggregationBufferSets, bufferIndex, - vector, batchSize, inputVector.isNull); - } - } - } - } - - private void iterateNoNullsRepeatingWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - HiveDecimalWritable value, - int batchSize) { - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValueWithNullCheck(value); - } - } - - private void iterateNoNullsSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - HiveDecimalWritable[] values, - int[] selection, - int batchSize) { - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValueWithNullCheck(values[selection[i]]); - } - } - - private void iterateNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - HiveDecimalWritable[] values, - int batchSize) { - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValueWithNullCheck(values[i]); - } - } - - private void iterateHasNullsRepeatingSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - HiveDecimalWritable value, - int batchSize, - int[] selection, - boolean[] isNull) { - - if (isNull[0]) { - return; - } - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValueWithNullCheck(value); - } - - } - - private void iterateHasNullsRepeatingWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - HiveDecimalWritable value, - int batchSize, - boolean[] isNull) { - - if (isNull[0]) { - return; - } - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValueWithNullCheck(value); - } - } - - private void iterateHasNullsSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - HiveDecimalWritable[] values, - int batchSize, - int[] selection, - boolean[] isNull) { - - for (int j=0; j < batchSize; ++j) { - int i = selection[j]; - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - j); - myagg.sumValueWithNullCheck(values[i]); - } - } - } - - private void iterateHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - HiveDecimalWritable[] values, - int batchSize, - boolean[] isNull) { - - for (int i=0; i < batchSize; ++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValueWithNullCheck(values[i]); - } - } - } - - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - DecimalColumnVector inputVector = - (DecimalColumnVector)batch.cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - HiveDecimalWritable[] vector = inputVector.vector; - - if (inputVector.isRepeating) { - if (inputVector.noNulls) { - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum.setFromLong(0L); - myagg.count = 0; - } - HiveDecimal value = vector[0].getHiveDecimal(); - HiveDecimal multiple = value.multiply(HiveDecimal.create(batchSize)); - myagg.sum.mutateAdd(multiple); - myagg.count += batchSize; - } - return; - } - - if (!batch.selectedInUse && inputVector.noNulls) { - iterateNoSelectionNoNulls(myagg, vector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull); - } - else if (inputVector.noNulls){ - iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected); - } - else { - iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - HiveDecimalWritable[] vector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - myagg.sumValueWithNullCheck(vector[i]); - } - } - } - - private void iterateSelectionNoNulls( - Aggregation myagg, - HiveDecimalWritable[] vector, - int batchSize, - int[] selected) { - - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum.setFromLong(0L); - myagg.count = 0; - } - - for (int i=0; i< batchSize; ++i) { - myagg.sumValueNoNullCheck(vector[selected[i]]); - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - HiveDecimalWritable[] vector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - myagg.sumValueWithNullCheck(vector[i]); - } - } - } - - private void iterateNoSelectionNoNulls( - Aggregation myagg, - HiveDecimalWritable[] vector, - int batchSize) { - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum.setFromLong(0L); - myagg.count = 0; - } - - for (int i=0;i<batchSize;++i) { - myagg.sumValueNoNullCheck(vector[i]); - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.reset(); - } - - @Override - public Object evaluateOutput( - AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - // !isSet checks for overflow. - if (myagg.isNull || !myagg.sum.isSet()) { - return null; - } - else { - assert(0 < myagg.count); - resultCount.set (myagg.count); - resultSum.set(myagg.sum); - return partialResult; - } - } - - @Override - public ObjectInspector getOutputObjectInspector() { - return soi; - } - - @Override - public long getAggregationBufferFixedSize() { - JavaDataModel model = JavaDataModel.get(); - return JavaDataModel.alignUp( - model.object() + - model.primitive2() * 2, - model.memoryAlign()); - } - - @Override - public void init(AggregationDesc desc) throws HiveException { - ExprNodeDesc inputExpr = desc.getParameters().get(0); - DecimalTypeInfo tiInput = (DecimalTypeInfo) inputExpr.getTypeInfo(); - this.inputScale = (short) tiInput.scale(); - this.inputPrecision = (short) tiInput.precision(); - - initPartialResultInspector(); - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; - } -} -
http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java deleted file mode 100644 index 365dcf6..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java +++ /dev/null @@ -1,488 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; - -/** - * Generated from template VectorUDAFAvg.txt. - */ -@Description(name = "avg", - value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: timestamp)") -public class VectorUDAFAvgTimestamp extends VectorAggregateExpression { - - private static final long serialVersionUID = 1L; - - /** class for storing the current aggregate value. */ - static class Aggregation implements AggregationBuffer { - - private static final long serialVersionUID = 1L; - - transient private double sum; - transient private long count; - - /** - * Value is explicitly (re)initialized in reset() - */ - transient private boolean isNull = true; - - public void sumValue(double value) { - if (isNull) { - sum = value; - count = 1; - isNull = false; - } else { - sum += value; - count++; - } - } - - @Override - public int getVariableSize() { - throw new UnsupportedOperationException(); - } - - @Override - public void reset () { - isNull = true; - sum = 0; - count = 0L; - } - } - - private VectorExpression inputExpression; - - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - - transient private Object[] partialResult; - transient private LongWritable resultCount; - transient private DoubleWritable resultSum; - transient private StructObjectInspector soi; - - public VectorUDAFAvgTimestamp(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; - } - - public VectorUDAFAvgTimestamp() { - super(); - partialResult = new Object[2]; - resultCount = new LongWritable(); - resultSum = new DoubleWritable(); - partialResult[0] = resultCount; - partialResult[1] = resultSum; - initPartialResultInspector(); - } - - private void initPartialResultInspector() { - List<ObjectInspector> foi = new ArrayList<ObjectInspector>(); - foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - List<String> fname = new ArrayList<String>(); - fname.add("count"); - fname.add("sum"); - soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex); - return myagg; - } - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - VectorizedRowBatch batch) throws HiveException { - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - if (inputColVector.noNulls) { - if (inputColVector.isRepeating) { - iterateNoNullsRepeatingWithAggregationSelection( - aggregationBufferSets, bufferIndex, - inputColVector.getDouble(0), - batchSize); - } else { - if (batch.selectedInUse) { - iterateNoNullsSelectionWithAggregationSelection( - aggregationBufferSets, bufferIndex, - inputColVector, batch.selected, batchSize); - } else { - iterateNoNullsWithAggregationSelection( - aggregationBufferSets, bufferIndex, - inputColVector, batchSize); - } - } - } else { - if (inputColVector.isRepeating) { - if (batch.selectedInUse) { - iterateHasNullsRepeatingSelectionWithAggregationSelection( - aggregationBufferSets, bufferIndex, - inputColVector.getDouble(0), batchSize, batch.selected, inputColVector.isNull); - } else { - iterateHasNullsRepeatingWithAggregationSelection( - aggregationBufferSets, bufferIndex, - inputColVector.getDouble(0), batchSize, inputColVector.isNull); - } - } else { - if (batch.selectedInUse) { - iterateHasNullsSelectionWithAggregationSelection( - aggregationBufferSets, bufferIndex, - inputColVector, batchSize, batch.selected, inputColVector.isNull); - } else { - iterateHasNullsWithAggregationSelection( - aggregationBufferSets, bufferIndex, - inputColVector, batchSize, inputColVector.isNull); - } - } - } - } - - private void iterateNoNullsRepeatingWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - double value, - int batchSize) { - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValue(value); - } - } - - private void iterateNoNullsSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - TimestampColumnVector inputColVector, - int[] selection, - int batchSize) { - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValue( - inputColVector.getDouble(selection[i])); - } - } - - private void iterateNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - TimestampColumnVector inputColVector, - int batchSize) { - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValue(inputColVector.getDouble(i)); - } - } - - private void iterateHasNullsRepeatingSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - double value, - int batchSize, - int[] selection, - boolean[] isNull) { - - for (int i=0; i < batchSize; ++i) { - if (!isNull[selection[i]]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValue(value); - } - } - - } - - private void iterateHasNullsRepeatingWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - double value, - int batchSize, - boolean[] isNull) { - - for (int i=0; i < batchSize; ++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValue(value); - } - } - } - - private void iterateHasNullsSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - TimestampColumnVector inputColVector, - int batchSize, - int[] selection, - boolean[] isNull) { - - for (int j=0; j < batchSize; ++j) { - int i = selection[j]; - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - j); - myagg.sumValue(inputColVector.getDouble(i)); - } - } - } - - private void iterateHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int bufferIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for (int i=0; i < batchSize; ++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - bufferIndex, - i); - myagg.sumValue(inputColVector.getDouble(i)); - } - } - } - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = - (TimestampColumnVector)batch.cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls) { - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum = 0; - myagg.count = 0; - } - myagg.sum += inputColVector.getDouble(0)*batchSize; - myagg.count += batchSize; - } - return; - } - - if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNulls(myagg, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNulls(myagg, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull, batch.selected); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum = 0; - myagg.count = 0; - } - myagg.sum += value; - myagg.count += 1; - } - } - } - - private void iterateSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum = 0; - myagg.count = 0; - } - - for (int i=0; i< batchSize; ++i) { - double value = inputColVector.getDouble(selected[i]); - myagg.sum += value; - myagg.count += 1; - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum = 0; - myagg.count = 0; - } - myagg.sum += value; - myagg.count += 1; - } - } - } - - private void iterateNoSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize) { - if (myagg.isNull) { - myagg.isNull = false; - myagg.sum = 0; - myagg.count = 0; - } - - for (int i=0;i<batchSize;++i) { - double value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.reset(); - } - - @Override - public Object evaluateOutput( - AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - assert(0 < myagg.count); - resultCount.set (myagg.count); - resultSum.set (myagg.sum); - return partialResult; - } - } - - @Override - public ObjectInspector getOutputObjectInspector() { - return soi; - } - - @Override - public long getAggregationBufferFixedSize() { - JavaDataModel model = JavaDataModel.get(); - return JavaDataModel.alignUp( - model.object() + - model.primitive2() * 2, - model.memoryAlign()); - } - - @Override - public void init(AggregationDesc desc) throws HiveException { - // No-op - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; - } -} - http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java index 52b05ca..4b3eca09 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggreg import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -44,7 +45,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hive.common.util.BloomFilter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,18 +54,11 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression { private static final long serialVersionUID = 1L; - private VectorExpression inputExpression; - - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - private long expectedEntries = -1; private ValueProcessor valueProcessor; - transient private int bitSetSize = -1; - transient private BytesWritable bw = new BytesWritable(); - transient private ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + transient private int bitSetSize; + transient private BytesWritable bw; + transient private ByteArrayOutputStream byteStream; /** * class for storing the current aggregate value. @@ -90,9 +83,15 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression { } } - public VectorUDAFBloomFilter(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; + public VectorUDAFBloomFilter(VectorExpression inputExpression, + GenericUDAFEvaluator.Mode mode) { + super(inputExpression, mode); + } + + private void init() { + bitSetSize = -1; + bw = new BytesWritable(); + byteStream = new ByteArrayOutputStream(); // Instantiate the ValueProcessor based on the input type VectorExpressionDescriptor.ArgumentType inputType = @@ -123,10 +122,6 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression { } } - public VectorUDAFBloomFilter() { - super(); - } - @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { if (expectedEntries < 0) { @@ -405,19 +400,13 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression { @Override public void init(AggregationDesc desc) throws HiveException { + init(); + GenericUDAFBloomFilterEvaluator udafBloomFilter = (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator(); expectedEntries = udafBloomFilter.getExpectedEntries(); } - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; - } - public long getExpectedEntries() { return expectedEntries; } http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java index b986eb4..67a7c50 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression.AggregationBuffer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -39,16 +40,9 @@ public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression { private static final long serialVersionUID = 1L; - private VectorExpression inputExpression; - - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - private long expectedEntries = -1; - transient private int aggBufferSize = -1; - transient private BytesWritable bw = new BytesWritable(); + transient private int aggBufferSize; + transient private BytesWritable bw; /** * class for storing the current aggregate value. @@ -81,13 +75,14 @@ public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression { } } - public VectorUDAFBloomFilterMerge(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; + public VectorUDAFBloomFilterMerge(VectorExpression inputExpression, + GenericUDAFEvaluator.Mode mode) { + super(inputExpression, mode); } - public VectorUDAFBloomFilterMerge() { - super(); + private void init() { + aggBufferSize = -1; + bw = new BytesWritable(); } @Override @@ -355,6 +350,8 @@ public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression { @Override public void init(AggregationDesc desc) throws HiveException { + init(); + GenericUDAFBloomFilterEvaluator udafBloomFilter = (GenericUDAFBloomFilterEvaluator) desc.getGenericUDAFEvaluator(); expectedEntries = udafBloomFilter.getExpectedEntries(); http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java index cadb6dd..d9490c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -59,22 +60,13 @@ public class VectorUDAFCount extends VectorAggregateExpression { } } - private VectorExpression inputExpression = null; + transient private LongWritable result; - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - - transient private final LongWritable result; - - public VectorUDAFCount(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; + public VectorUDAFCount(VectorExpression inputExpression, GenericUDAFEvaluator.Mode mode) { + super(inputExpression, mode); } - public VectorUDAFCount() { - super(); + private void init() { result = new LongWritable(0); } @@ -270,15 +262,7 @@ public class VectorUDAFCount extends VectorAggregateExpression { @Override public void init(AggregationDesc desc) throws HiveException { - // No-op - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; + init(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java index c489f8f..10a8660 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -60,22 +61,13 @@ public class VectorUDAFCountMerge extends VectorAggregateExpression { } } - private VectorExpression inputExpression = null; + transient private LongWritable result; - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - - transient private final LongWritable result; - - public VectorUDAFCountMerge(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; + public VectorUDAFCountMerge(VectorExpression inputExpression, GenericUDAFEvaluator.Mode mode) { + super(inputExpression, mode); } - public VectorUDAFCountMerge() { - super(); + private void init() { result = new LongWritable(0); } @@ -396,15 +388,7 @@ public class VectorUDAFCountMerge extends VectorAggregateExpression { @Override public void init(AggregationDesc desc) throws HiveException { - // No-op - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; + init(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java index 3b66030..3bc6a71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -57,21 +58,13 @@ public class VectorUDAFCountStar extends VectorAggregateExpression { } } + transient private LongWritable result; - @Override - public VectorExpression inputExpression() { - // None. - return null; + public VectorUDAFCountStar(VectorExpression inputExpression, GenericUDAFEvaluator.Mode mode) { + super(inputExpression, mode); } - transient private final LongWritable result; - - public VectorUDAFCountStar(VectorExpression inputExpression) { - this(); - } - - public VectorUDAFCountStar() { - super(); + private void init() { result = new LongWritable(0); } @@ -153,7 +146,6 @@ public class VectorUDAFCountStar extends VectorAggregateExpression { @Override public void init(AggregationDesc desc) throws HiveException { - // No-op + init(); } } - http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java deleted file mode 100644 index 5388d37..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java +++ /dev/null @@ -1,533 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; - -/** -* VectorUDAFStdPopTimestamp. Vectorized implementation for VARIANCE aggregates. -*/ -@Description(name = "std,stddev,stddev_pop", - value = "_FUNC_(x) - Returns the standard deviation of a set of numbers (vectorized, timestamp)") -public class VectorUDAFStdPopTimestamp extends VectorAggregateExpression { - - private static final long serialVersionUID = 1L; - - /** - /* class for storing the current aggregate value. - */ - private static final class Aggregation implements AggregationBuffer { - - private static final long serialVersionUID = 1L; - - transient private double sum; - transient private long count; - transient private double variance; - - /** - * Value is explicitly (re)initialized in reset() (despite the init() bellow...) - */ - transient private boolean isNull = true; - - public void init() { - isNull = false; - sum = 0; - count = 0; - variance = 0; - } - - @Override - public int getVariableSize() { - throw new UnsupportedOperationException(); - } - - @Override - public void reset () { - isNull = true; - sum = 0; - count = 0; - variance = 0; - } - } - - private VectorExpression inputExpression; - - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - - transient private LongWritable resultCount; - transient private DoubleWritable resultSum; - transient private DoubleWritable resultVariance; - transient private Object[] partialResult; - - transient private ObjectInspector soi; - - - public VectorUDAFStdPopTimestamp(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; - } - - public VectorUDAFStdPopTimestamp() { - super(); - partialResult = new Object[3]; - resultCount = new LongWritable(); - resultSum = new DoubleWritable(); - resultVariance = new DoubleWritable(); - partialResult[0] = resultCount; - partialResult[1] = resultSum; - partialResult[2] = resultVariance; - initPartialResultInspector(); - } - - private void initPartialResultInspector() { - List<ObjectInspector> foi = new ArrayList<ObjectInspector>(); - foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - - List<String> fname = new ArrayList<String>(); - fname.add("count"); - fname.add("sum"); - fname.add("variance"); - - soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); - return myagg; - } - - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - VectorizedRowBatch batch) throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls || !inputColVector.isNull[0]) { - iterateRepeatingNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector.getDouble(0), batchSize); - } - } - else if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, - inputColVector.isNull, batch.selected); - } - - } - - private void iterateRepeatingNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - double value, - int batchSize) { - - for (int i=0; i<batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - private void iterateSelectionHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - j); - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateSelectionNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - for (int i=0; i< batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - double value = inputColVector.getDouble(selected[i]); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - private void iterateNoSelectionHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateNoSelectionNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize) { - - for (int i=0; i<batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - if (myagg.isNull) { - myagg.init (); - } - double value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls) { - iterateRepeatingNoNulls(myagg, inputColVector.getDouble(0), batchSize); - } - } - else if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNulls(myagg, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNulls(myagg, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull, batch.selected); - } - } - - private void iterateRepeatingNoNulls( - Aggregation myagg, - double value, - int batchSize) { - - if (myagg.isNull) { - myagg.init (); - } - - // TODO: conjure a formula w/o iterating - // - - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // We pulled out i=0 so we can remove the count > 1 check in the loop - for (int i=1; i<batchSize; ++i) { - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - if (myagg.isNull) { - myagg.init (); - } - - double value = inputColVector.getDouble(selected[0]); - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // i=0 was pulled out to remove the count > 1 check in the loop - // - for (int i=1; i< batchSize; ++i) { - value = inputColVector.getDouble(selected[i]); - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateNoSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize) { - - if (myagg.isNull) { - myagg.init (); - } - - double value = inputColVector.getDouble(0); - myagg.sum += value; - myagg.count += 1; - - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // i=0 was pulled out to remove count > 1 check - for (int i=1; i<batchSize; ++i) { - value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.reset(); - } - - @Override - public Object evaluateOutput( - AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - assert(0 < myagg.count); - resultCount.set (myagg.count); - resultSum.set (myagg.sum); - resultVariance.set (myagg.variance); - return partialResult; - } - } - @Override - public ObjectInspector getOutputObjectInspector() { - return soi; - } - - @Override - public long getAggregationBufferFixedSize() { - JavaDataModel model = JavaDataModel.get(); - return JavaDataModel.alignUp( - model.object() + - model.primitive2()*3+ - model.primitive1(), - model.memoryAlign()); - } - - @Override - public void init(AggregationDesc desc) throws HiveException { - // No-op - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; - } -} - http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java deleted file mode 100644 index 1769dc0..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java +++ /dev/null @@ -1,533 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; - -/** -* VectorUDAFStdSampTimestamp. Vectorized implementation for VARIANCE aggregates. -*/ -@Description(name = "stddev_samp", - value = "_FUNC_(x) - Returns the sample standard deviation of a set of numbers (vectorized, double)") -public class VectorUDAFStdSampTimestamp extends VectorAggregateExpression { - - private static final long serialVersionUID = 1L; - - /** - /* class for storing the current aggregate value. - */ - private static final class Aggregation implements AggregationBuffer { - - private static final long serialVersionUID = 1L; - - transient private double sum; - transient private long count; - transient private double variance; - - /** - * Value is explicitly (re)initialized in reset() (despite the init() bellow...) - */ - transient private boolean isNull = true; - - public void init() { - isNull = false; - sum = 0; - count = 0; - variance = 0; - } - - @Override - public int getVariableSize() { - throw new UnsupportedOperationException(); - } - - @Override - public void reset () { - isNull = true; - sum = 0; - count = 0; - variance = 0; - } - } - - private VectorExpression inputExpression; - - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - - transient private LongWritable resultCount; - transient private DoubleWritable resultSum; - transient private DoubleWritable resultVariance; - transient private Object[] partialResult; - - transient private ObjectInspector soi; - - - public VectorUDAFStdSampTimestamp(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; - } - - public VectorUDAFStdSampTimestamp() { - super(); - partialResult = new Object[3]; - resultCount = new LongWritable(); - resultSum = new DoubleWritable(); - resultVariance = new DoubleWritable(); - partialResult[0] = resultCount; - partialResult[1] = resultSum; - partialResult[2] = resultVariance; - initPartialResultInspector(); - } - - private void initPartialResultInspector() { - List<ObjectInspector> foi = new ArrayList<ObjectInspector>(); - foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - - List<String> fname = new ArrayList<String>(); - fname.add("count"); - fname.add("sum"); - fname.add("variance"); - - soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); - return myagg; - } - - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - VectorizedRowBatch batch) throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls || !inputColVector.isNull[0]) { - iterateRepeatingNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector.getDouble(0), batchSize); - } - } - else if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, - inputColVector.isNull, batch.selected); - } - - } - - private void iterateRepeatingNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - double value, - int batchSize) { - - for (int i=0; i<batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - private void iterateSelectionHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - j); - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateSelectionNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - for (int i=0; i< batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - double value = inputColVector.getDouble(selected[i]); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - private void iterateNoSelectionHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateNoSelectionNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize) { - - for (int i=0; i<batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - if (myagg.isNull) { - myagg.init (); - } - double value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls) { - iterateRepeatingNoNulls(myagg, inputColVector.getDouble(0), batchSize); - } - } - else if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNulls(myagg, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNulls(myagg, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull, batch.selected); - } - } - - private void iterateRepeatingNoNulls( - Aggregation myagg, - double value, - int batchSize) { - - if (myagg.isNull) { - myagg.init (); - } - - // TODO: conjure a formula w/o iterating - // - - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // We pulled out i=0 so we can remove the count > 1 check in the loop - for (int i=1; i<batchSize; ++i) { - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - if (myagg.isNull) { - myagg.init (); - } - - double value = inputColVector.getDouble(selected[0]); - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // i=0 was pulled out to remove the count > 1 check in the loop - // - for (int i=1; i< batchSize; ++i) { - value = inputColVector.getDouble(selected[i]); - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateNoSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize) { - - if (myagg.isNull) { - myagg.init (); - } - - double value = inputColVector.getDouble(0); - myagg.sum += value; - myagg.count += 1; - - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // i=0 was pulled out to remove count > 1 check - for (int i=1; i<batchSize; ++i) { - value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.reset(); - } - - @Override - public Object evaluateOutput( - AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - assert(0 < myagg.count); - resultCount.set (myagg.count); - resultSum.set (myagg.sum); - resultVariance.set (myagg.variance); - return partialResult; - } - } - @Override - public ObjectInspector getOutputObjectInspector() { - return soi; - } - - @Override - public long getAggregationBufferFixedSize() { - JavaDataModel model = JavaDataModel.get(); - return JavaDataModel.alignUp( - model.object() + - model.primitive2()*3+ - model.primitive1(), - model.memoryAlign()); - } - - @Override - public void init(AggregationDesc desc) throws HiveException { - // No-op - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; - } -} - http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java index a37e3f6..e3e8574 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java @@ -27,10 +27,15 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; /** * VectorUDAFSumDecimal. Vectorized implementation for SUM aggregates. @@ -73,20 +78,22 @@ public class VectorUDAFSumDecimal extends VectorAggregateExpression { } } - private VectorExpression inputExpression; + private DecimalTypeInfo outputDecimalTypeInfo; - @Override - public VectorExpression inputExpression() { - return inputExpression; + public VectorUDAFSumDecimal(VectorExpression inputExpression, GenericUDAFEvaluator.Mode mode) { + super(inputExpression, mode); } - public VectorUDAFSumDecimal(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; - } + private void init() { - public VectorUDAFSumDecimal() { - super(); + String outputType = inputExpression.getOutputType(); + DecimalTypeInfo inputDecimalTypeInfo = + (DecimalTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(outputType); + + outputDecimalTypeInfo = + GenericUDAFSum.GenericUDAFSumHiveDecimal.getOutputDecimalTypeInfoForSum( + inputDecimalTypeInfo.getPrecision(), inputDecimalTypeInfo.getScale(), + this.mode); } private Aggregation getCurrentAggregationBuffer( @@ -421,6 +428,8 @@ public class VectorUDAFSumDecimal extends VectorAggregateExpression { return null; } else { + myagg.sum.mutateEnforcePrecisionScale( + outputDecimalTypeInfo.getPrecision(), outputDecimalTypeInfo.getScale()); return myagg.sum.getHiveDecimal(); } } @@ -440,15 +449,7 @@ public class VectorUDAFSumDecimal extends VectorAggregateExpression { @Override public void init(AggregationDesc desc) throws HiveException { - // No-op - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; + init(); } }