http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumTimestamp.java new file mode 100644 index 0000000..9651ad3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumTimestamp.java @@ -0,0 +1,434 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; + +import java.sql.Timestamp; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; + +/** +* VectorUDAFSumTimestamp. Vectorized implementation for SUM aggregates. +*/ +@Description(name = "sum", + value = "_FUNC_(expr) - Returns the sum value of expr (vectorized, type: <ValueType>)") +public class VectorUDAFSumTimestamp extends VectorAggregateExpression { + + private static final long serialVersionUID = 1L; + + /** + * class for storing the current aggregate value. + */ + private static final class Aggregation implements AggregationBuffer { + + private static final long serialVersionUID = 1L; + + transient private double sum; + + /** + * Value is explicitly (re)initialized in reset() + */ + transient private boolean isNull = true; + + public void sumValue(double value) { + if (isNull) { + sum = value; + isNull = false; + } else { + sum += value; + } + } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void reset () { + isNull = true; + sum = 0;; + } + } + + transient private DoubleWritable result; + + public VectorUDAFSumTimestamp(VectorExpression inputExpression, GenericUDAFEvaluator.Mode mode) { + super(inputExpression, mode); + } + + private void init() { + result = new DoubleWritable(); + } + + private Aggregation getCurrentAggregationBuffer( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + int row) { + VectorAggregationBufferRow mySet = aggregationBufferSets[row]; + Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); + return myagg; + } + + @Override + public void aggregateInputSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + VectorizedRowBatch batch) throws HiveException { + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + inputExpression.evaluate(batch); + + TimestampColumnVector inputVector = + (TimestampColumnVector)batch.cols[this.inputExpression.getOutputColumn()]; + + if (inputVector.noNulls) { + if (inputVector.isRepeating) { + iterateNoNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputVector.getDouble(0), batchSize); + } else { + if (batch.selectedInUse) { + iterateNoNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputVector, batch.selected, batchSize); + } else { + iterateNoNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputVector, batchSize); + } + } + } else { + if (inputVector.isRepeating) { + if (batch.selectedInUse) { + iterateHasNullsRepeatingSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputVector.getDouble(0), batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsRepeatingWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputVector.getDouble(0), batchSize, inputVector.isNull); + } + } else { + if (batch.selectedInUse) { + iterateHasNullsSelectionWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputVector, batchSize, batch.selected, inputVector.isNull); + } else { + iterateHasNullsWithAggregationSelection( + aggregationBufferSets, aggregateIndex, + inputVector, batchSize, inputVector.isNull); + } + } + } + } + + private void iterateNoNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double value, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + + private void iterateNoNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + TimestampColumnVector inputVector, + int[] selection, + int batchSize) { + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(inputVector.getDouble(selection[i])); + } + } + + private void iterateNoNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + TimestampColumnVector inputVector, + int batchSize) { + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(inputVector.getDouble(i)); + } + } + + private void iterateHasNullsRepeatingSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double value, + int batchSize, + int[] selection, + boolean[] isNull) { + + if (isNull[0]) { + return; + } + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + + } + + private void iterateHasNullsRepeatingWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + double value, + int batchSize, + boolean[] isNull) { + + if (isNull[0]) { + return; + } + + for (int i=0; i < batchSize; ++i) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(value); + } + } + + private void iterateHasNullsSelectionWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + TimestampColumnVector inputVector, + int batchSize, + int[] selection, + boolean[] isNull) { + + for (int j=0; j < batchSize; ++j) { + int i = selection[j]; + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + j); + myagg.sumValue(inputVector.getDouble(i)); + } + } + } + + private void iterateHasNullsWithAggregationSelection( + VectorAggregationBufferRow[] aggregationBufferSets, + int aggregateIndex, + TimestampColumnVector inputVector, + int batchSize, + boolean[] isNull) { + + for (int i=0; i < batchSize; ++i) { + if (!isNull[i]) { + Aggregation myagg = getCurrentAggregationBuffer( + aggregationBufferSets, + aggregateIndex, + i); + myagg.sumValue(inputVector.getDouble(i)); + } + } + } + + @Override + public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) + throws HiveException { + + inputExpression.evaluate(batch); + + TimestampColumnVector inputVector = + (TimestampColumnVector)batch.cols[this.inputExpression.getOutputColumn()]; + + int batchSize = batch.size; + + if (batchSize == 0) { + return; + } + + Aggregation myagg = (Aggregation)agg; + + if (inputVector.isRepeating) { + if (inputVector.noNulls) { + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + } + myagg.sum += inputVector.getDouble(0) * batchSize; + } + return; + } + + if (!batch.selectedInUse && inputVector.noNulls) { + iterateNoSelectionNoNulls(myagg, inputVector, batchSize); + } + else if (!batch.selectedInUse) { + iterateNoSelectionHasNulls(myagg, inputVector, batchSize, inputVector.isNull); + } + else if (inputVector.noNulls){ + iterateSelectionNoNulls(myagg, inputVector, batchSize, batch.selected); + } + else { + iterateSelectionHasNulls(myagg, inputVector, batchSize, inputVector.isNull, batch.selected); + } + } + + private void iterateSelectionHasNulls( + Aggregation myagg, + TimestampColumnVector inputVector, + int batchSize, + boolean[] isNull, + int[] selected) { + + for (int j=0; j< batchSize; ++j) { + int i = selected[j]; + if (!isNull[i]) { + if (myagg.isNull) { + myagg.isNull = false; + myagg.sum = 0; + } + myagg.sum += inputVector.getDouble(i); + } + } + } + + private void iterateSelectionNoNulls( + Aggregation myagg, + TimestampColumnVector inputVector, + int batchSize, + int[] selected) { + + if (myagg.isNull) { + myagg.sum = 0; + myagg.isNull = false; + } + + for (int i=0; i< batchSize; ++i) { + myagg.sum += inputVector.getDouble(selected[i]); + } + } + + private void iterateNoSelectionHasNulls( + Aggregation myagg, + TimestampColumnVector inputVector, + int batchSize, + boolean[] isNull) { + + for(int i=0;i<batchSize;++i) { + if (!isNull[i]) { + if (myagg.isNull) { + myagg.sum = 0; + myagg.isNull = false; + } + myagg.sum += inputVector.getDouble(i); + } + } + } + + private void iterateNoSelectionNoNulls( + Aggregation myagg, + TimestampColumnVector inputVector, + int batchSize) { + if (myagg.isNull) { + myagg.sum = 0; + myagg.isNull = false; + } + + for (int i=0;i<batchSize;++i) { + myagg.sum += inputVector.getDouble(i); + } + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + return new Aggregation(); + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + Aggregation myAgg = (Aggregation) agg; + myAgg.reset(); + } + + @Override + public Object evaluateOutput(AggregationBuffer agg) throws HiveException { + Aggregation myagg = (Aggregation) agg; + if (myagg.isNull) { + return null; + } + else { + result.set(myagg.sum); + return result; + } + } + + @Override + public ObjectInspector getOutputObjectInspector() { + return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + } + + @Override + public long getAggregationBufferFixedSize() { + JavaDataModel model = JavaDataModel.get(); + return JavaDataModel.alignUp( + model.object(), + model.memoryAlign()); + } + + @Override + public void init(AggregationDesc desc) throws HiveException { + init(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java deleted file mode 100644 index 61cdeaa..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java +++ /dev/null @@ -1,533 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; - -/** -* VectorUDAFVarPopTimestamp. Vectorized implementation for VARIANCE aggregates. -*/ -@Description(name = "variance, var_pop", - value = "_FUNC_(x) - Returns the variance of a set of numbers (vectorized, timestamp)") -public class VectorUDAFVarPopTimestamp extends VectorAggregateExpression { - - private static final long serialVersionUID = 1L; - - /** - /* class for storing the current aggregate value. - */ - private static final class Aggregation implements AggregationBuffer { - - private static final long serialVersionUID = 1L; - - transient private double sum; - transient private long count; - transient private double variance; - - /** - * Value is explicitly (re)initialized in reset() (despite the init() bellow...) - */ - transient private boolean isNull = true; - - public void init() { - isNull = false; - sum = 0; - count = 0; - variance = 0; - } - - @Override - public int getVariableSize() { - throw new UnsupportedOperationException(); - } - - @Override - public void reset () { - isNull = true; - sum = 0; - count = 0; - variance = 0; - } - } - - private VectorExpression inputExpression; - - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - - transient private LongWritable resultCount; - transient private DoubleWritable resultSum; - transient private DoubleWritable resultVariance; - transient private Object[] partialResult; - - transient private ObjectInspector soi; - - - public VectorUDAFVarPopTimestamp(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; - } - - public VectorUDAFVarPopTimestamp() { - super(); - partialResult = new Object[3]; - resultCount = new LongWritable(); - resultSum = new DoubleWritable(); - resultVariance = new DoubleWritable(); - partialResult[0] = resultCount; - partialResult[1] = resultSum; - partialResult[2] = resultVariance; - initPartialResultInspector(); - } - - private void initPartialResultInspector() { - List<ObjectInspector> foi = new ArrayList<ObjectInspector>(); - foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - - List<String> fname = new ArrayList<String>(); - fname.add("count"); - fname.add("sum"); - fname.add("variance"); - - soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); - return myagg; - } - - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - VectorizedRowBatch batch) throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls || !inputColVector.isNull[0]) { - iterateRepeatingNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector.getDouble(0), batchSize); - } - } - else if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, - inputColVector.isNull, batch.selected); - } - - } - - private void iterateRepeatingNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - double value, - int batchSize) { - - for (int i=0; i<batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - private void iterateSelectionHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - j); - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateSelectionNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - for (int i=0; i< batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - double value = inputColVector.getDouble(selected[i]); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - private void iterateNoSelectionHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateNoSelectionNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize) { - - for (int i=0; i<batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - if (myagg.isNull) { - myagg.init (); - } - double value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls) { - iterateRepeatingNoNulls(myagg, inputColVector.getDouble(0), batchSize); - } - } - else if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNulls(myagg, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNulls(myagg, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull, batch.selected); - } - } - - private void iterateRepeatingNoNulls( - Aggregation myagg, - double value, - int batchSize) { - - if (myagg.isNull) { - myagg.init (); - } - - // TODO: conjure a formula w/o iterating - // - - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // We pulled out i=0 so we can remove the count > 1 check in the loop - for (int i=1; i<batchSize; ++i) { - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - if (myagg.isNull) { - myagg.init (); - } - - double value = inputColVector.getDouble(selected[0]); - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // i=0 was pulled out to remove the count > 1 check in the loop - // - for (int i=1; i< batchSize; ++i) { - value = inputColVector.getDouble(selected[i]); - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateNoSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize) { - - if (myagg.isNull) { - myagg.init (); - } - - double value = inputColVector.getDouble(0); - myagg.sum += value; - myagg.count += 1; - - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // i=0 was pulled out to remove count > 1 check - for (int i=1; i<batchSize; ++i) { - value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.reset(); - } - - @Override - public Object evaluateOutput( - AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - assert(0 < myagg.count); - resultCount.set (myagg.count); - resultSum.set (myagg.sum); - resultVariance.set (myagg.variance); - return partialResult; - } - } - @Override - public ObjectInspector getOutputObjectInspector() { - return soi; - } - - @Override - public long getAggregationBufferFixedSize() { - JavaDataModel model = JavaDataModel.get(); - return JavaDataModel.alignUp( - model.object() + - model.primitive2()*3+ - model.primitive1(), - model.memoryAlign()); - } - - @Override - public void init(AggregationDesc desc) throws HiveException { - // No-op - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; - } -} - http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java deleted file mode 100644 index c375461..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java +++ /dev/null @@ -1,533 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; - -/** -* VectorUDAFVarSampTimestamp. Vectorized implementation for VARIANCE aggregates. -*/ -@Description(name = "var_samp", - value = "_FUNC_(x) - Returns the sample variance of a set of numbers (vectorized, double)") -public class VectorUDAFVarSampTimestamp extends VectorAggregateExpression { - - private static final long serialVersionUID = 1L; - - /** - /* class for storing the current aggregate value. - */ - private static final class Aggregation implements AggregationBuffer { - - private static final long serialVersionUID = 1L; - - transient private double sum; - transient private long count; - transient private double variance; - - /** - * Value is explicitly (re)initialized in reset() (despite the init() bellow...) - */ - transient private boolean isNull = true; - - public void init() { - isNull = false; - sum = 0; - count = 0; - variance = 0; - } - - @Override - public int getVariableSize() { - throw new UnsupportedOperationException(); - } - - @Override - public void reset () { - isNull = true; - sum = 0; - count = 0; - variance = 0; - } - } - - private VectorExpression inputExpression; - - @Override - public VectorExpression inputExpression() { - return inputExpression; - } - - transient private LongWritable resultCount; - transient private DoubleWritable resultSum; - transient private DoubleWritable resultVariance; - transient private Object[] partialResult; - - transient private ObjectInspector soi; - - - public VectorUDAFVarSampTimestamp(VectorExpression inputExpression) { - this(); - this.inputExpression = inputExpression; - } - - public VectorUDAFVarSampTimestamp() { - super(); - partialResult = new Object[3]; - resultCount = new LongWritable(); - resultSum = new DoubleWritable(); - resultVariance = new DoubleWritable(); - partialResult[0] = resultCount; - partialResult[1] = resultSum; - partialResult[2] = resultVariance; - initPartialResultInspector(); - } - - private void initPartialResultInspector() { - List<ObjectInspector> foi = new ArrayList<ObjectInspector>(); - foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - - List<String> fname = new ArrayList<String>(); - fname.add("count"); - fname.add("sum"); - fname.add("variance"); - - soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); - return myagg; - } - - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - VectorizedRowBatch batch) throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls || !inputColVector.isNull[0]) { - iterateRepeatingNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector.getDouble(0), batchSize); - } - } - else if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, inputColVector, batchSize, - inputColVector.isNull, batch.selected); - } - - } - - private void iterateRepeatingNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - double value, - int batchSize) { - - for (int i=0; i<batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - private void iterateSelectionHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - j); - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateSelectionNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - for (int i=0; i< batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - double value = inputColVector.getDouble(selected[i]); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - private void iterateNoSelectionHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateNoSelectionNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - TimestampColumnVector inputColVector, - int batchSize) { - - for (int i=0; i<batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - if (myagg.isNull) { - myagg.init (); - } - double value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - TimestampColumnVector inputColVector = (TimestampColumnVector)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - if (inputColVector.isRepeating) { - if (inputColVector.noNulls) { - iterateRepeatingNoNulls(myagg, inputColVector.getDouble(0), batchSize); - } - } - else if (!batch.selectedInUse && inputColVector.noNulls) { - iterateNoSelectionNoNulls(myagg, inputColVector, batchSize); - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull); - } - else if (inputColVector.noNulls){ - iterateSelectionNoNulls(myagg, inputColVector, batchSize, batch.selected); - } - else { - iterateSelectionHasNulls(myagg, inputColVector, batchSize, inputColVector.isNull, batch.selected); - } - } - - private void iterateRepeatingNoNulls( - Aggregation myagg, - double value, - int batchSize) { - - if (myagg.isNull) { - myagg.init (); - } - - // TODO: conjure a formula w/o iterating - // - - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // We pulled out i=0 so we can remove the count > 1 check in the loop - for (int i=1; i<batchSize; ++i) { - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - int[] selected) { - - if (myagg.isNull) { - myagg.init (); - } - - double value = inputColVector.getDouble(selected[0]); - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // i=0 was pulled out to remove the count > 1 check in the loop - // - for (int i=1; i< batchSize; ++i) { - value = inputColVector.getDouble(selected[i]); - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize, - boolean[] isNull) { - - for(int i=0;i<batchSize;++i) { - if (!isNull[i]) { - double value = inputColVector.getDouble(i); - if (myagg.isNull) { - myagg.init (); - } - myagg.sum += value; - myagg.count += 1; - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - } - } - - private void iterateNoSelectionNoNulls( - Aggregation myagg, - TimestampColumnVector inputColVector, - int batchSize) { - - if (myagg.isNull) { - myagg.init (); - } - - double value = inputColVector.getDouble(0); - myagg.sum += value; - myagg.count += 1; - - if(myagg.count > 1) { - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - - // i=0 was pulled out to remove count > 1 check - for (int i=1; i<batchSize; ++i) { - value = inputColVector.getDouble(i); - myagg.sum += value; - myagg.count += 1; - double t = myagg.count*value - myagg.sum; - myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1)); - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.reset(); - } - - @Override - public Object evaluateOutput( - AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - assert(0 < myagg.count); - resultCount.set (myagg.count); - resultSum.set (myagg.sum); - resultVariance.set (myagg.variance); - return partialResult; - } - } - @Override - public ObjectInspector getOutputObjectInspector() { - return soi; - } - - @Override - public long getAggregationBufferFixedSize() { - JavaDataModel model = JavaDataModel.get(); - return JavaDataModel.alignUp( - model.object() + - model.primitive2()*3+ - model.primitive1(), - model.memoryAlign()); - } - - @Override - public void init(AggregationDesc desc) throws HiveException { - // No-op - } - - public VectorExpression getInputExpression() { - return inputExpression; - } - - public void setInputExpression(VectorExpression inputExpression) { - this.inputExpression = inputExpression; - } -} - http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java index 3b3624d..14ba646 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector.udf; import java.sql.Date; import java.sql.Timestamp; +import java.util.Map; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; @@ -37,8 +38,10 @@ import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveCharWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector; import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable; @@ -61,7 +64,8 @@ public class VectorUDFAdaptor extends VectorExpression { private transient GenericUDF genericUDF; private transient GenericUDF.DeferredObject[] deferredChildren; - private transient ObjectInspector outputOI; + private transient TypeInfo outputTypeInfo; + private transient VectorAssignRow outputVectorAssignRow; private transient ObjectInspector[] childrenOIs; private transient VectorExpressionWriter[] writers; @@ -95,8 +99,9 @@ public class VectorUDFAdaptor extends VectorExpression { if (context != null) { context.setup(genericUDF); } - outputOI = VectorExpressionWriterFactory.genVectorExpressionWritable(expr) - .getObjectInspector(); + outputTypeInfo = expr.getTypeInfo(); + outputVectorAssignRow = new VectorAssignRow(); + outputVectorAssignRow.init(outputTypeInfo, outputColumn); genericUDF.initialize(childrenOIs); @@ -207,163 +212,9 @@ public class VectorUDFAdaptor extends VectorExpression { result = null; } - // set output column vector entry - if (result == null) { - b.cols[outputColumn].noNulls = false; - b.cols[outputColumn].isNull[i] = true; - } else { - b.cols[outputColumn].isNull[i] = false; - setOutputCol(b.cols[outputColumn], i, result); - } - } - - private void setOutputCol(ColumnVector colVec, int i, Object value) { - - /* Depending on the output type, get the value, cast the result to the - * correct type if needed, and assign the result into the output vector. - */ - if (outputOI instanceof WritableStringObjectInspector) { - BytesColumnVector bv = (BytesColumnVector) colVec; - Text t; - if (value instanceof String) { - t = new Text((String) value); - } else { - t = ((WritableStringObjectInspector) outputOI).getPrimitiveWritableObject(value); - } - bv.setVal(i, t.getBytes(), 0, t.getLength()); - } else if (outputOI instanceof WritableHiveCharObjectInspector) { - WritableHiveCharObjectInspector writableHiveCharObjectOI = (WritableHiveCharObjectInspector) outputOI; - int maxLength = ((CharTypeInfo) writableHiveCharObjectOI.getTypeInfo()).getLength(); - BytesColumnVector bv = (BytesColumnVector) colVec; - - HiveCharWritable hiveCharWritable; - if (value instanceof HiveCharWritable) { - hiveCharWritable = ((HiveCharWritable) value); - } else { - hiveCharWritable = writableHiveCharObjectOI.getPrimitiveWritableObject(value); - } - Text t = hiveCharWritable.getTextValue(); - - // In vector mode, we stored CHAR as unpadded. - StringExpr.rightTrimAndTruncate(bv, i, t.getBytes(), 0, t.getLength(), maxLength); - } else if (outputOI instanceof WritableHiveVarcharObjectInspector) { - WritableHiveVarcharObjectInspector writableHiveVarcharObjectOI = (WritableHiveVarcharObjectInspector) outputOI; - int maxLength = ((VarcharTypeInfo) writableHiveVarcharObjectOI.getTypeInfo()).getLength(); - BytesColumnVector bv = (BytesColumnVector) colVec; - - HiveVarcharWritable hiveVarcharWritable; - if (value instanceof HiveVarcharWritable) { - hiveVarcharWritable = ((HiveVarcharWritable) value); - } else { - hiveVarcharWritable = writableHiveVarcharObjectOI.getPrimitiveWritableObject(value); - } - Text t = hiveVarcharWritable.getTextValue(); - - StringExpr.truncate(bv, i, t.getBytes(), 0, t.getLength(), maxLength); - } else if (outputOI instanceof WritableIntObjectInspector) { - LongColumnVector lv = (LongColumnVector) colVec; - if (value instanceof Integer) { - lv.vector[i] = (Integer) value; - } else { - lv.vector[i] = ((WritableIntObjectInspector) outputOI).get(value); - } - } else if (outputOI instanceof WritableLongObjectInspector) { - LongColumnVector lv = (LongColumnVector) colVec; - if (value instanceof Long) { - lv.vector[i] = (Long) value; - } else { - lv.vector[i] = ((WritableLongObjectInspector) outputOI).get(value); - } - } else if (outputOI instanceof WritableDoubleObjectInspector) { - DoubleColumnVector dv = (DoubleColumnVector) colVec; - if (value instanceof Double) { - dv.vector[i] = (Double) value; - } else { - dv.vector[i] = ((WritableDoubleObjectInspector) outputOI).get(value); - } - } else if (outputOI instanceof WritableFloatObjectInspector) { - DoubleColumnVector dv = (DoubleColumnVector) colVec; - if (value instanceof Float) { - dv.vector[i] = (Float) value; - } else { - dv.vector[i] = ((WritableFloatObjectInspector) outputOI).get(value); - } - } else if (outputOI instanceof WritableShortObjectInspector) { - LongColumnVector lv = (LongColumnVector) colVec; - if (value instanceof Short) { - lv.vector[i] = (Short) value; - } else { - lv.vector[i] = ((WritableShortObjectInspector) outputOI).get(value); - } - } else if (outputOI instanceof WritableByteObjectInspector) { - LongColumnVector lv = (LongColumnVector) colVec; - if (value instanceof Byte) { - lv.vector[i] = (Byte) value; - } else { - lv.vector[i] = ((WritableByteObjectInspector) outputOI).get(value); - } - } else if (outputOI instanceof WritableTimestampObjectInspector) { - TimestampColumnVector tv = (TimestampColumnVector) colVec; - Timestamp ts; - if (value instanceof Timestamp) { - ts = (Timestamp) value; - } else { - ts = ((WritableTimestampObjectInspector) outputOI).getPrimitiveJavaObject(value); - } - tv.set(i, ts); - } else if (outputOI instanceof WritableDateObjectInspector) { - LongColumnVector lv = (LongColumnVector) colVec; - Date ts; - if (value instanceof Date) { - ts = (Date) value; - } else { - ts = ((WritableDateObjectInspector) outputOI).getPrimitiveJavaObject(value); - } - long l = DateWritable.dateToDays(ts); - lv.vector[i] = l; - } else if (outputOI instanceof WritableBooleanObjectInspector) { - LongColumnVector lv = (LongColumnVector) colVec; - if (value instanceof Boolean) { - lv.vector[i] = (Boolean) value ? 1 : 0; - } else { - lv.vector[i] = ((WritableBooleanObjectInspector) outputOI).get(value) ? 1 : 0; - } - } else if (outputOI instanceof WritableHiveDecimalObjectInspector) { - DecimalColumnVector dcv = (DecimalColumnVector) colVec; - if (value instanceof HiveDecimal) { - dcv.set(i, (HiveDecimal) value); - } else { - HiveDecimal hd = ((WritableHiveDecimalObjectInspector) outputOI).getPrimitiveJavaObject(value); - dcv.set(i, hd); - } - } else if (outputOI instanceof WritableBinaryObjectInspector) { - BytesWritable bw = (BytesWritable) value; - BytesColumnVector bv = (BytesColumnVector) colVec; - bv.setVal(i, bw.getBytes(), 0, bw.getLength()); - } else if (outputOI instanceof WritableHiveIntervalYearMonthObjectInspector) { - LongColumnVector lv = (LongColumnVector) colVec; - HiveIntervalYearMonth iym; - if (value instanceof HiveIntervalYearMonth) { - iym = (HiveIntervalYearMonth) value; - } else { - iym = ((WritableHiveIntervalYearMonthObjectInspector) outputOI).getPrimitiveJavaObject(value); - } - long l = iym.getTotalMonths(); - lv.vector[i] = l; - } else if (outputOI instanceof WritableHiveIntervalDayTimeObjectInspector) { - IntervalDayTimeColumnVector idtv = (IntervalDayTimeColumnVector) colVec; - HiveIntervalDayTime idt; - if (value instanceof HiveIntervalDayTime) { - idt = (HiveIntervalDayTime) value; - } else { - idt = ((WritableHiveIntervalDayTimeObjectInspector) outputOI).getPrimitiveJavaObject(value); - } - idtv.set(i, idt); - } else { - throw new RuntimeException("Unhandled object type " + outputOI.getTypeName() + - " inspector class " + outputOI.getClass().getName() + - " value class " + value.getClass().getName()); - } + // Set output column vector entry. Since we have one output column, the logical index = 0. + outputVectorAssignRow.assignRowColumn( + b, /* batchIndex */ i, /* logicalColumnIndex */ 0, result); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 72bdc71..933e47d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -263,6 +263,8 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean useVectorDeserialize; private boolean useRowDeserialize; private boolean isReduceVectorizationEnabled; + private boolean isVectorizationComplexTypesEnabled; + private boolean isVectorizationGroupByComplexTypesEnabled; private boolean isSchemaEvolution; @@ -1379,6 +1381,8 @@ public class Vectorizer implements PhysicalPlanResolver { try { ret = validateMapWorkOperator(op, mapWork, isTezOrSpark); } catch (Exception e) { + String oneLineStackTrace = VectorizationContext.getStackTraceAsSingleLine(e); + LOG.info(oneLineStackTrace); throw new SemanticException(e); } if (!ret) { @@ -1699,6 +1703,13 @@ public class Vectorizer implements PhysicalPlanResolver { HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED); + isVectorizationComplexTypesEnabled = + HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_COMPLEX_TYPES_ENABLED); + isVectorizationGroupByComplexTypesEnabled = + HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_COMPLEX_TYPES_ENABLED); + isSchemaEvolution = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION); @@ -1872,7 +1883,8 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean validateMapJoinDesc(MapJoinDesc desc) { byte posBigTable = (byte) desc.getPosBigTable(); List<ExprNodeDesc> filterExprs = desc.getFilters().get(posBigTable); - if (!validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER)) { + if (!validateExprNodeDesc( + filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER, /* allowComplex */ true)) { return false; } List<ExprNodeDesc> keyExprs = desc.getKeys().get(posBigTable); @@ -1903,7 +1915,8 @@ public class Vectorizer implements PhysicalPlanResolver { List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag); List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag); List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag); - return validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER) && + return validateExprNodeDesc( + filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER, /* allowComplex */ true) && validateExprNodeDesc(keyExprs, "Key") && validateExprNodeDesc(valueExprs, "Value"); } @@ -1928,7 +1941,8 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean validateFilterOperator(FilterOperator op) { ExprNodeDesc desc = op.getConf().getPredicate(); - return validateExprNodeDesc(desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER); + return validateExprNodeDesc( + desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER, /* allowComplex */ true); } private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTezOrSpark) { @@ -1938,7 +1952,7 @@ public class Vectorizer implements PhysicalPlanResolver { setOperatorIssue("DISTINCT not supported"); return false; } - boolean ret = validateExprNodeDesc(desc.getKeys(), "Key"); + boolean ret = validateExprNodeDescNoComplex(desc.getKeys(), "Key"); if (!ret) { return false; } @@ -2045,12 +2059,12 @@ public class Vectorizer implements PhysicalPlanResolver { VectorGroupByDesc.groupByDescModeToVectorProcessingMode(desc.getMode(), hasKeys); if (desc.isGroupingSetsPresent() && (processingMode != ProcessingMode.HASH && processingMode != ProcessingMode.STREAMING)) { - LOG.info("Vectorized GROUPING SETS only expected for HASH and STREAMING processing modes"); + setOperatorIssue("Vectorized GROUPING SETS only expected for HASH and STREAMING processing modes"); return false; } Pair<Boolean,Boolean> retPair = - validateAggregationDescs(desc.getAggregators(), processingMode, hasKeys); + validateAggregationDescs(desc.getAggregators(), desc.getMode(), hasKeys); if (!retPair.left) { return false; } @@ -2064,6 +2078,9 @@ public class Vectorizer implements PhysicalPlanResolver { vectorDesc.setProcessingMode(processingMode); + vectorDesc.setIsVectorizationComplexTypesEnabled(isVectorizationComplexTypesEnabled); + vectorDesc.setIsVectorizationGroupByComplexTypesEnabled(isVectorizationGroupByComplexTypesEnabled); + LOG.info("Vector GROUP BY operator will use processing mode " + processingMode.name() + ", isVectorOutput " + vectorDesc.isVectorOutput()); @@ -2075,14 +2092,21 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, String expressionTitle) { - return validateExprNodeDesc(descs, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION); + return validateExprNodeDesc( + descs, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION, /* allowComplex */ true); + } + + private boolean validateExprNodeDescNoComplex(List<ExprNodeDesc> descs, String expressionTitle) { + return validateExprNodeDesc( + descs, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION, /* allowComplex */ false); } private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, String expressionTitle, - VectorExpressionDescriptor.Mode mode) { + VectorExpressionDescriptor.Mode mode, + boolean allowComplex) { for (ExprNodeDesc d : descs) { - boolean ret = validateExprNodeDesc(d, expressionTitle, mode); + boolean ret = validateExprNodeDesc(d, expressionTitle, mode, allowComplex); if (!ret) { return false; } @@ -2091,10 +2115,10 @@ public class Vectorizer implements PhysicalPlanResolver { } private Pair<Boolean,Boolean> validateAggregationDescs(List<AggregationDesc> descs, - ProcessingMode processingMode, boolean hasKeys) { + GroupByDesc.Mode groupByMode, boolean hasKeys) { boolean outputIsPrimitive = true; for (AggregationDesc d : descs) { - Pair<Boolean,Boolean> retPair = validateAggregationDesc(d, processingMode, hasKeys); + Pair<Boolean,Boolean> retPair = validateAggregationDesc(d, groupByMode, hasKeys); if (!retPair.left) { return retPair; } @@ -2106,7 +2130,7 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, String expressionTitle, - VectorExpressionDescriptor.Mode mode) { + VectorExpressionDescriptor.Mode mode, boolean allowComplex) { if (desc instanceof ExprNodeColumnDesc) { ExprNodeColumnDesc c = (ExprNodeColumnDesc) desc; // Currently, we do not support vectorized virtual columns (see HIVE-5570). @@ -2116,9 +2140,11 @@ public class Vectorizer implements PhysicalPlanResolver { } } String typeName = desc.getTypeInfo().getTypeName(); - boolean ret = validateDataType(typeName, mode); + boolean ret = validateDataType(typeName, mode, allowComplex && isVectorizationComplexTypesEnabled); if (!ret) { - setExpressionIssue(expressionTitle, "Data type " + typeName + " of " + desc.toString() + " not supported"); + setExpressionIssue(expressionTitle, + getValidateDataTypeErrorMsg( + typeName, mode, allowComplex, isVectorizationComplexTypesEnabled)); return false; } boolean isInExpression = false; @@ -2144,7 +2170,8 @@ public class Vectorizer implements PhysicalPlanResolver { for (ExprNodeDesc d : desc.getChildren()) { // Don't restrict child expressions for projection. // Always use loose FILTER mode. - if (!validateExprNodeDescRecursive(d, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) { + if (!validateExprNodeDescRecursive( + d, expressionTitle, VectorExpressionDescriptor.Mode.FILTER, /* allowComplex */ true)) { return false; } } @@ -2195,12 +2222,13 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle) { - return validateExprNodeDesc(desc, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION); + return validateExprNodeDesc( + desc, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION, /* allowComplex */ true); } boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle, - VectorExpressionDescriptor.Mode mode) { - if (!validateExprNodeDescRecursive(desc, expressionTitle, mode)) { + VectorExpressionDescriptor.Mode mode, boolean allowComplex) { + if (!validateExprNodeDescRecursive(desc, expressionTitle, mode, allowComplex)) { return false; } try { @@ -2239,12 +2267,12 @@ public class Vectorizer implements PhysicalPlanResolver { return true; } - public static ObjectInspector.Category aggregationOutputCategory(VectorAggregateExpression vectorAggrExpr) { + public static Category aggregationOutputCategory(VectorAggregateExpression vectorAggrExpr) { ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector(); return outputObjInspector.getCategory(); } - private Pair<Boolean,Boolean> validateAggregationDesc(AggregationDesc aggDesc, ProcessingMode processingMode, + private Pair<Boolean,Boolean> validateAggregationDesc(AggregationDesc aggDesc, GroupByDesc.Mode groupByMode, boolean hasKeys) { String udfName = aggDesc.getGenericUDAFName().toLowerCase(); @@ -2253,12 +2281,16 @@ public class Vectorizer implements PhysicalPlanResolver { return new Pair<Boolean,Boolean>(false, false); } /* + // The planner seems to pull this one out. if (aggDesc.getDistinct()) { setExpressionIssue("Aggregation Function", "DISTINCT not supported"); return new Pair<Boolean,Boolean>(false, false); } */ - if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters(), "Aggregation Function UDF " + udfName + " parameter")) { + + ArrayList<ExprNodeDesc> parameters = aggDesc.getParameters(); + + if (parameters != null && !validateExprNodeDesc(parameters, "Aggregation Function UDF " + udfName + " parameter")) { return new Pair<Boolean,Boolean>(false, false); } @@ -2280,27 +2312,90 @@ public class Vectorizer implements PhysicalPlanResolver { " vector expression " + vectorAggrExpr.toString()); } - ObjectInspector.Category outputCategory = aggregationOutputCategory(vectorAggrExpr); - boolean outputIsPrimitive = (outputCategory == ObjectInspector.Category.PRIMITIVE); - if (processingMode == ProcessingMode.MERGE_PARTIAL && - hasKeys && - !outputIsPrimitive) { - setOperatorIssue("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types"); - return new Pair<Boolean,Boolean>(false, false); + boolean canVectorizeComplexType = + (isVectorizationComplexTypesEnabled && isVectorizationGroupByComplexTypesEnabled); + + boolean isVectorOutput; + if (canVectorizeComplexType) { + isVectorOutput = true; + } else { + + // Do complex input type checking... + boolean inputIsPrimitive; + if (parameters == null || parameters.size() == 0) { + inputIsPrimitive = true; // Pretend for COUNT(*) + } else { + + // Multi-input should have been eliminated earlier. + // Preconditions.checkState(parameters.size() == 1); + + final Category inputCategory = parameters.get(0).getTypeInfo().getCategory(); + inputIsPrimitive = (inputCategory == Category.PRIMITIVE); + } + + if (!inputIsPrimitive) { + setOperatorIssue("Cannot vectorize GROUP BY with aggregation complex type inputs in " + + aggDesc.getExprString() + " since " + + GroupByDesc.getComplexTypeWithGroupByEnabledCondition( + isVectorizationComplexTypesEnabled, isVectorizationGroupByComplexTypesEnabled)); + return new Pair<Boolean,Boolean>(false, false); + } + + // Now, look a the output. If the output is complex, we switch to row-mode for all child + // operators... + isVectorOutput = (aggregationOutputCategory(vectorAggrExpr) == Category.PRIMITIVE); } - return new Pair<Boolean,Boolean>(true, outputIsPrimitive); + return new Pair<Boolean,Boolean>(true, isVectorOutput); } - public static boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) { + public static boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode, + boolean allowComplex) { + type = type.toLowerCase(); boolean result = supportedDataTypesPattern.matcher(type).matches(); if (result && mode == VectorExpressionDescriptor.Mode.PROJECTION && type.equals("void")) { return false; } + + if (!result) { + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(type); + if (typeInfo.getCategory() != Category.PRIMITIVE) { + if (allowComplex) { + return true; + } + } + } return result; } + public static String getValidateDataTypeErrorMsg(String type, VectorExpressionDescriptor.Mode mode, + boolean allowComplex, boolean isVectorizationComplexTypesEnabled) { + + type = type.toLowerCase(); + boolean result = supportedDataTypesPattern.matcher(type).matches(); + if (result && mode == VectorExpressionDescriptor.Mode.PROJECTION && type.equals("void")) { + return "Vectorizing data type void not supported when mode = PROJECTION"; + } + + if (!result) { + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(type); + if (typeInfo.getCategory() != Category.PRIMITIVE) { + if (allowComplex && isVectorizationComplexTypesEnabled) { + return null; + } else if (!allowComplex) { + return "Vectorizing complex type " + typeInfo.getCategory() + " not supported"; + } else { + return "Vectorizing complex type " + typeInfo.getCategory() + " not enabled (" + + type + ") since " + + GroupByDesc.getComplexTypeEnabledCondition( + isVectorizationComplexTypesEnabled); + } + } + } + return (result ? null : "Vectorizing data type " + type + " not supported"); + } + private VectorizationContext getVectorizationContext(String contextName, VectorTaskColumnInfo vectorTaskColumnInfo) { @@ -3482,7 +3577,7 @@ public class Vectorizer implements PhysicalPlanResolver { } VectorAggregateExpression[] vecAggregators = vectorGroupByDesc.getAggregators(); for (VectorAggregateExpression vecAggr : vecAggregators) { - if (usesVectorUDFAdaptor(vecAggr.inputExpression())) { + if (usesVectorUDFAdaptor(vecAggr.getInputExpression())) { vectorTaskColumnInfo.setUsesVectorUDFAdaptor(true); } } http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index fe91ee7..45d100d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; @@ -305,6 +306,21 @@ public class GroupByDesc extends AbstractOperatorDesc { this.isDistinct = isDistinct; } + @Override + public Object clone() { + ArrayList<java.lang.String> outputColumnNames = new ArrayList<>(); + outputColumnNames.addAll(this.outputColumnNames); + ArrayList<ExprNodeDesc> keys = new ArrayList<>(); + keys.addAll(this.keys); + ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators = new ArrayList<>(); + aggregators.addAll(this.aggregators); + List<Integer> listGroupingSets = new ArrayList<>(); + listGroupingSets.addAll(this.listGroupingSets); + return new GroupByDesc(this.mode, outputColumnNames, keys, aggregators, + this.groupByMemoryUsage, this.memoryThreshold, listGroupingSets, this.groupingSetsPresent, + this.groupingSetPosition, this.isDistinct); + } + public class GroupByOperatorExplainVectorization extends OperatorExplainVectorization { private final GroupByDesc groupByDesc; @@ -337,20 +353,42 @@ public class GroupByDesc extends AbstractOperatorDesc { return vectorGroupByDesc.isVectorOutput(); } + @Explain(vectorization = Vectorization.OPERATOR, displayName = "vectorProcessingMode", explainLevels = { Level.DEFAULT, Level.EXTENDED }) + public String getProcessingMode() { + return vectorGroupByDesc.getProcessingMode().name(); + } + + @Explain(vectorization = Vectorization.OPERATOR, displayName = "groupByMode", explainLevels = { Level.DEFAULT, Level.EXTENDED }) + public String getGroupByMode() { + return groupByDesc.getMode().name(); + } + @Explain(vectorization = Vectorization.OPERATOR, displayName = "vectorOutputConditionsNotMet", explainLevels = { Level.DEFAULT, Level.EXTENDED }) public List<String> getVectorOutputConditionsNotMet() { List<String> results = new ArrayList<String>(); + + boolean isVectorizationComplexTypesEnabled = vectorGroupByDesc.getIsVectorizationComplexTypesEnabled(); + boolean isVectorizationGroupByComplexTypesEnabled = vectorGroupByDesc.getIsVectorizationGroupByComplexTypesEnabled(); + + if (isVectorizationComplexTypesEnabled && isVectorizationGroupByComplexTypesEnabled) { + return null; + } + VectorAggregateExpression[] vecAggregators = vectorGroupByDesc.getAggregators(); for (VectorAggregateExpression vecAggr : vecAggregators) { Category category = Vectorizer.aggregationOutputCategory(vecAggr); if (category != ObjectInspector.Category.PRIMITIVE) { results.add( - "Vector output of " + vecAggr.toString() + " output type " + category + " requires PRIMITIVE IS false"); + "Vector output of " + vecAggr.toString() + " output type " + category + " requires PRIMITIVE type IS false"); } } if (results.size() == 0) { return null; } + + results.add( + getComplexTypeWithGroupByEnabledCondition( + isVectorizationComplexTypesEnabled, isVectorizationGroupByComplexTypesEnabled)); return results; } @@ -368,18 +406,21 @@ public class GroupByDesc extends AbstractOperatorDesc { return new GroupByOperatorExplainVectorization(this, vectorDesc); } - @Override - public Object clone() { - ArrayList<java.lang.String> outputColumnNames = new ArrayList<>(); - outputColumnNames.addAll(this.outputColumnNames); - ArrayList<ExprNodeDesc> keys = new ArrayList<>(); - keys.addAll(this.keys); - ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators = new ArrayList<>(); - aggregators.addAll(this.aggregators); - List<Integer> listGroupingSets = new ArrayList<>(); - listGroupingSets.addAll(this.listGroupingSets); - return new GroupByDesc(this.mode, outputColumnNames, keys, aggregators, - this.groupByMemoryUsage, this.memoryThreshold, listGroupingSets, this.groupingSetsPresent, - this.groupingSetPosition, this.isDistinct); + public static String getComplexTypeEnabledCondition( + boolean isVectorizationComplexTypesEnabled) { + return + HiveConf.ConfVars.HIVE_VECTORIZATION_COMPLEX_TYPES_ENABLED.varname + + " IS " + isVectorizationComplexTypesEnabled; + } + + public static String getComplexTypeWithGroupByEnabledCondition( + boolean isVectorizationComplexTypesEnabled, + boolean isVectorizationGroupByComplexTypesEnabled) { + final boolean enabled = (isVectorizationComplexTypesEnabled && isVectorizationGroupByComplexTypesEnabled); + return "(" + + HiveConf.ConfVars.HIVE_VECTORIZATION_COMPLEX_TYPES_ENABLED.varname + " " + isVectorizationComplexTypesEnabled + + " AND " + + HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_COMPLEX_TYPES_ENABLED.varname + " " + isVectorizationGroupByComplexTypesEnabled + + ") IS " + enabled; } } http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 2120400..8b99ae0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -299,27 +299,6 @@ public class MapWork extends BaseWork { isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid); } - private boolean checkVectorizerSupportedTypes(boolean hasLlap) { - for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) { - final String alias = entry.getKey(); - Operator<? extends OperatorDesc> op = entry.getValue(); - PartitionDesc partitionDesc = aliasToPartnInfo.get(alias); - if (op instanceof TableScanOperator && partitionDesc != null && - partitionDesc.getTableDesc() != null) { - final TableScanOperator tsOp = (TableScanOperator) op; - final List<String> readColumnNames = tsOp.getNeededColumns(); - final Properties props = partitionDesc.getTableDesc().getProperties(); - final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString( - props.getProperty(serdeConstants.LIST_COLUMN_TYPES)); - final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos); - final List<String> allColumnNames = Utilities.getColumnNames(props); - hasLlap = Utilities.checkVectorizerSupportedTypes(readColumnNames, allColumnNames, - allColumnTypes); - } - } - return hasLlap; - } - private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) { if (!isLlapOn) return null; // LLAP IO is off, don't output. http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java index f9a8725..89d868d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java @@ -65,6 +65,8 @@ public class VectorGroupByDesc extends AbstractVectorDesc { private VectorExpression[] keyExpressions; private VectorAggregateExpression[] aggregators; private int[] projectedOutputColumns; + private boolean isVectorizationComplexTypesEnabled; + private boolean isVectorizationGroupByComplexTypesEnabled; public VectorGroupByDesc() { this.processingMode = ProcessingMode.NONE; @@ -110,6 +112,22 @@ public class VectorGroupByDesc extends AbstractVectorDesc { return projectedOutputColumns; } + public void setIsVectorizationComplexTypesEnabled(boolean isVectorizationComplexTypesEnabled) { + this.isVectorizationComplexTypesEnabled = isVectorizationComplexTypesEnabled; + } + + public boolean getIsVectorizationComplexTypesEnabled() { + return isVectorizationComplexTypesEnabled; + } + + public void setIsVectorizationGroupByComplexTypesEnabled(boolean isVectorizationGroupByComplexTypesEnabled) { + this.isVectorizationGroupByComplexTypesEnabled = isVectorizationGroupByComplexTypesEnabled; + } + + public boolean getIsVectorizationGroupByComplexTypesEnabled() { + return isVectorizationGroupByComplexTypesEnabled; + } + /** * Which ProcessingMode for VectorGroupByOperator? * http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java index a28f7e8..2ea426c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java @@ -259,16 +259,18 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { } private DecimalTypeInfo deriveResultDecimalTypeInfo() { - int prec = inputOI.precision(); - int scale = inputOI.scale(); + return deriveResultDecimalTypeInfo(inputOI.precision(), inputOI.scale(), mode); + } + + public static DecimalTypeInfo deriveResultDecimalTypeInfo(int precision, int scale, Mode mode) { if (mode == Mode.FINAL || mode == Mode.COMPLETE) { - int intPart = prec - scale; + int intPart = precision - scale; // The avg() result type has the same number of integer digits and 4 more decimal digits. scale = Math.min(scale + 4, HiveDecimal.MAX_SCALE - intPart); return TypeInfoFactory.getDecimalTypeInfo(intPart + scale, scale); } else { // For intermediate sum field - return GenericUDAFAverage.deriveSumFieldTypeInfo(prec, scale); + return GenericUDAFAverage.deriveSumFieldTypeInfo(precision, scale); } } http://git-wip-us.apache.org/repos/asf/hive/blob/92fbe256/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java index 6d3b92b..a041ffc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java @@ -211,15 +211,15 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { super.init(m, parameters); result = new HiveDecimalWritable(0); inputOI = (PrimitiveObjectInspector) parameters[0]; - // The output precision is 10 greater than the input which should cover at least - // 10b rows. The scale is the same as the input. - DecimalTypeInfo outputTypeInfo = null; + + final DecimalTypeInfo outputTypeInfo; if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) { - int precision = Math.min(HiveDecimal.MAX_PRECISION, inputOI.precision() + 10); - outputTypeInfo = TypeInfoFactory.getDecimalTypeInfo(precision, inputOI.scale()); + outputTypeInfo = getOutputDecimalTypeInfoForSum(inputOI.precision(), inputOI.scale(), mode); } else { + // No change. outputTypeInfo = (DecimalTypeInfo) inputOI.getTypeInfo(); } + ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(outputTypeInfo); outputOI = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector( oi, ObjectInspectorCopyOption.JAVA); @@ -227,6 +227,21 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { return oi; } + public static DecimalTypeInfo getOutputDecimalTypeInfoForSum(final int inputPrecision, + int inputScale, Mode mode) { + + // The output precision is 10 greater than the input which should cover at least + // 10b rows. The scale is the same as the input. + DecimalTypeInfo outputTypeInfo = null; + if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) { + int precision = Math.min(HiveDecimal.MAX_PRECISION, inputPrecision + 10); + outputTypeInfo = TypeInfoFactory.getDecimalTypeInfo(precision, inputScale); + } else { + outputTypeInfo = TypeInfoFactory.getDecimalTypeInfo(inputPrecision, inputScale); + } + return outputTypeInfo; + } + /** class for storing decimal sum value. */ @AggregationType(estimable = false) // hard to know exactly for decimals static class SumHiveDecimalWritableAgg extends SumAgg<HiveDecimalWritable> {