This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch revert-9910-add-variance-function in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 48bed903895c6e75a7f5e031c2d4ff2b20f5788b Author: Seunghyun Lee <[email protected]> AuthorDate: Thu Dec 8 14:31:51 2022 -0800 Revert "Add Variance and Standard Deviation Aggregation Functions (#9910)" This reverts commit 852477bdf312607c9a0eba7278453b7957139466. --- .../apache/pinot/core/common/ObjectSerDeUtils.java | 26 +- .../function/AggregationFunctionFactory.java | 8 - .../function/CovarianceAggregationFunction.java | 31 +- .../function/VarianceAggregationFunction.java | 188 ------ .../utils/StatisticalAggregationFunctionUtils.java | 53 -- .../org/apache/pinot/queries/BaseQueriesTest.java | 16 +- .../pinot/queries/CovarianceQueriesTest.java | 461 +++++++++++++ .../pinot/queries/StatisticalQueriesTest.java | 749 --------------------- .../segment/local/customobject/VarianceTuple.java | 105 --- .../pinot/segment/spi/AggregationFunctionType.java | 4 - 10 files changed, 495 insertions(+), 1146 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 6fbacc5fcd..e22a2e7fb0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -72,7 +72,6 @@ import org.apache.pinot.segment.local.customobject.LongLongPair; import org.apache.pinot.segment.local.customobject.MinMaxRangePair; import org.apache.pinot.segment.local.customobject.QuantileDigest; import org.apache.pinot.segment.local.customobject.StringLongPair; -import org.apache.pinot.segment.local.customobject.VarianceTuple; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.spi.utils.BigDecimalUtils; import org.apache.pinot.spi.utils.ByteArray; @@ -124,8 +123,7 @@ public class ObjectSerDeUtils { FloatLongPair(29), DoubleLongPair(30), StringLongPair(31), - CovarianceTuple(32), - VarianceTuple(33); + CovarianceTuple(32); private final int _value; @@ -207,8 +205,6 @@ public class ObjectSerDeUtils { return ObjectType.StringLongPair; } else if (value instanceof CovarianceTuple) { return ObjectType.CovarianceTuple; - } else if (value instanceof VarianceTuple) { - return ObjectType.VarianceTuple; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -466,23 +462,6 @@ public class ObjectSerDeUtils { } }; - public static final ObjectSerDe<VarianceTuple> VARIANCE_TUPLE_OBJECT_SER_DE = new ObjectSerDe<VarianceTuple>() { - @Override - public byte[] serialize(VarianceTuple varianceTuple) { - return varianceTuple.toBytes(); - } - - @Override - public VarianceTuple deserialize(byte[] bytes) { - return VarianceTuple.fromBytes(bytes); - } - - @Override - public VarianceTuple deserialize(ByteBuffer byteBuffer) { - return VarianceTuple.fromByteBuffer(byteBuffer); - } - }; - public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new ObjectSerDe<HyperLogLog>() { @Override @@ -1212,8 +1191,7 @@ public class ObjectSerDeUtils { FLOAT_LONG_PAIR_SER_DE, DOUBLE_LONG_PAIR_SER_DE, STRING_LONG_PAIR_SER_DE, - COVARIANCE_TUPLE_OBJECT_SER_DE, - VARIANCE_TUPLE_OBJECT_SER_DE + COVARIANCE_TUPLE_OBJECT_SER_DE }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 2758d193a2..4e1bda2025 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -277,14 +277,6 @@ public class AggregationFunctionFactory { return new BooleanAndAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled()); case BOOLOR: return new BooleanOrAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled()); - case VARPOP: - return new VarianceAggregationFunction(firstArgument, false, false); - case VARSAMP: - return new VarianceAggregationFunction(firstArgument, true, false); - case STDDEVPOP: - return new VarianceAggregationFunction(firstArgument, false, true); - case STDDEVSAMP: - return new VarianceAggregationFunction(firstArgument, true, true); default: throw new IllegalArgumentException(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java index 1676835bae..bd68235c48 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.query.aggregation.function; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,7 +30,6 @@ import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; -import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils; import org.apache.pinot.segment.local.customobject.CovarianceTuple; import org.apache.pinot.segment.spi.AggregationFunctionType; @@ -99,8 +99,8 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari @Override public void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1); - double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2); + double[] values1 = getValSet(blockValSetMap, _expression1); + double[] values2 = getValSet(blockValSetMap, _expression2); double sumX = 0.0; double sumY = 0.0; @@ -134,11 +134,28 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari } } + private double[] getValSet(Map<ExpressionContext, BlockValSet> blockValSetMap, ExpressionContext expression) { + BlockValSet blockValSet = blockValSetMap.get(expression); + //TODO: Add MV support for covariance + Preconditions.checkState(blockValSet.isSingleValue(), + "Covariance function currently only supports single-valued column"); + switch (blockValSet.getValueType().getStoredType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return blockValSet.getDoubleValuesSV(); + default: + throw new IllegalStateException( + "Cannot compute covariance for non-numeric type: " + blockValSet.getValueType()); + } + } + @Override public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1); - double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2); + double[] values1 = getValSet(blockValSetMap, _expression1); + double[] values2 = getValSet(blockValSetMap, _expression2); for (int i = 0; i < length; i++) { setGroupByResult(groupKeyArray[i], groupByResultHolder, values1[i], values2[i], values1[i] * values2[i], 1L); } @@ -147,8 +164,8 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari @Override public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { - double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1); - double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2); + double[] values1 = getValSet(blockValSetMap, _expression1); + double[] values2 = getValSet(blockValSetMap, _expression2); for (int i = 0; i < length; i++) { for (int groupKey : groupKeysArray[i]) { setGroupByResult(groupKey, groupByResultHolder, values1[i], values2[i], values1[i] * values2[i], 1L); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java deleted file mode 100644 index c86269b7ce..0000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.query.aggregation.function; - -import java.util.Map; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.query.aggregation.AggregationResultHolder; -import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; -import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; -import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; -import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils; -import org.apache.pinot.segment.local.customobject.VarianceTuple; -import org.apache.pinot.segment.spi.AggregationFunctionType; - - -/** - * Aggregation function which computes Variance and Standard Deviation - * - * The algorithm to compute variance is based on "Updating Formulae and a Pairwise Algorithm for Computing - * Sample Variances" by Chan et al. Please refer to the "Parallel Algorithm" section from the following wiki: - * - https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm - */ -public class VarianceAggregationFunction extends BaseSingleInputAggregationFunction<VarianceTuple, Double> { - private static final double DEFAULT_FINAL_RESULT = Double.NEGATIVE_INFINITY; - protected final boolean _isSample; - - protected final boolean _isStdDev; - - public VarianceAggregationFunction(ExpressionContext expression, boolean isSample, boolean isStdDev) { - super(expression); - _isSample = isSample; - _isStdDev = isStdDev; - } - - @Override - public AggregationFunctionType getType() { - if (_isSample) { - return (_isStdDev) ? AggregationFunctionType.STDDEVSAMP : AggregationFunctionType.VARSAMP; - } - return (_isStdDev) ? AggregationFunctionType.STDDEVPOP : AggregationFunctionType.VARPOP; - } - - @Override - public AggregationResultHolder createAggregationResultHolder() { - return new ObjectAggregationResultHolder(); - } - - @Override - public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { - return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); - } - - @Override - public void aggregate(int length, AggregationResultHolder aggregationResultHolder, - Map<ExpressionContext, BlockValSet> blockValSetMap) { - double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression); - - long count = 0; - double sum = 0.0; - double variance = 0.0; - for (int i = 0; i < length; i++) { - count++; - sum += values[i]; - if (count > 1) { - variance = computeIntermediateVariance(count, sum, variance, values[i]); - } - } - setAggregationResult(aggregationResultHolder, length, sum, variance); - } - - private double computeIntermediateVariance(long count, double sum, double m2, double value) { - double t = count * value - sum; - m2 += (t * t) / (count * (count - 1)); - return m2; - } - - protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, long count, double sum, - double m2) { - VarianceTuple varianceTuple = aggregationResultHolder.getResult(); - if (varianceTuple == null) { - aggregationResultHolder.setValue(new VarianceTuple(count, sum, m2)); - } else { - varianceTuple.apply(count, sum, m2); - } - } - - protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, long count, double sum, - double m2) { - VarianceTuple varianceTuple = groupByResultHolder.getResult(groupKey); - if (varianceTuple == null) { - groupByResultHolder.setValueForKey(groupKey, new VarianceTuple(count, sum, m2)); - } else { - varianceTuple.apply(count, sum, m2); - } - } - - @Override - public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, - Map<ExpressionContext, BlockValSet> blockValSetMap) { - double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression); - for (int i = 0; i < length; i++) { - setGroupByResult(groupKeyArray[i], groupByResultHolder, 1L, values[i], 0.0); - } - } - - @Override - public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, - Map<ExpressionContext, BlockValSet> blockValSetMap) { - double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression); - for (int i = 0; i < length; i++) { - for (int groupKey : groupKeysArray[i]) { - setGroupByResult(groupKey, groupByResultHolder, 1L, values[i], 0.0); - } - } - } - - @Override - public VarianceTuple extractAggregationResult(AggregationResultHolder aggregationResultHolder) { - VarianceTuple varianceTuple = aggregationResultHolder.getResult(); - if (varianceTuple == null) { - return new VarianceTuple(0L, 0.0, 0.0); - } else { - return varianceTuple; - } - } - - @Override - public VarianceTuple extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { - return groupByResultHolder.getResult(groupKey); - } - - @Override - public VarianceTuple merge(VarianceTuple intermediateResult1, VarianceTuple intermediateResult2) { - intermediateResult1.apply(intermediateResult2); - return intermediateResult1; - } - - @Override - public DataSchema.ColumnDataType getIntermediateResultColumnType() { - return DataSchema.ColumnDataType.OBJECT; - } - - @Override - public DataSchema.ColumnDataType getFinalResultColumnType() { - return DataSchema.ColumnDataType.DOUBLE; - } - - @Override - public Double extractFinalResult(VarianceTuple varianceTuple) { - if (varianceTuple == null) { - return null; - } - long count = varianceTuple.getCount(); - if (count == 0L) { - return DEFAULT_FINAL_RESULT; - } else { - double variance = varianceTuple.getM2(); - if (_isSample) { - if (count - 1 == 0L) { - return DEFAULT_FINAL_RESULT; - } - double sampleVar = variance / (count - 1); - return (_isStdDev) ? Math.sqrt(sampleVar) : sampleVar; - } else { - double popVar = variance / count; - return (_isStdDev) ? Math.sqrt(popVar) : popVar; - } - } - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java deleted file mode 100644 index b7a05de3a4..0000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.query.aggregation.utils; - -import com.google.common.base.Preconditions; -import java.util.Map; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.core.common.BlockValSet; - - -/** - * Util class for statistical aggregation functions - * - * e.g. Variance, Covariance, Standard Deviation... - */ -public class StatisticalAggregationFunctionUtils { - private StatisticalAggregationFunctionUtils() { - } - - public static double[] getValSet(Map<ExpressionContext, BlockValSet> blockValSetMap, ExpressionContext expression) { - BlockValSet blockValSet = blockValSetMap.get(expression); - //TODO: Add MV support for covariance - Preconditions.checkState(blockValSet.isSingleValue(), - "Variance, Covariance, Standard Deviation function currently only supports single-valued column"); - switch (blockValSet.getValueType().getStoredType()) { - case INT: - case LONG: - case FLOAT: - case DOUBLE: - return blockValSet.getDoubleValuesSV(); - default: - throw new IllegalStateException( - "Cannot compute variance, covariance, or standard deviation for non-numeric type: " - + blockValSet.getValueType()); - } - } -} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java index fa5a5068df..f359987780 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java @@ -111,7 +111,7 @@ public abstract class BaseQueriesTest { * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with * different index segments in the test and overriding getDistinctInstances. * This can be particularly useful to test statistical aggregation functions. - * @see StatisticalQueriesTest for an example use case. + * @see CovarianceQueriesTest for an example use case. */ protected BrokerResponseNative getBrokerResponse(String query) { return getBrokerResponse(query, PLAN_MAKER); @@ -125,7 +125,7 @@ public abstract class BaseQueriesTest { * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with * different index segments in the test and overriding getDistinctInstances. * This can be particularly useful to test statistical aggregation functions. - * @see StatisticalQueriesTest for an example use case. + * @see CovarianceQueriesTest for an example use case. */ protected BrokerResponseNative getBrokerResponseWithFilter(String query) { return getBrokerResponse(query + getFilter()); @@ -139,7 +139,7 @@ public abstract class BaseQueriesTest { * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with * different index segments in the test and overriding getDistinctInstances. * This can be particularly useful to test statistical aggregation functions. - * @see StatisticalQueriesTest for an example use case. + * @see CovarianceQueriesTest for an example use case. */ protected BrokerResponseNative getBrokerResponse(String query, PlanMaker planMaker) { return getBrokerResponse(query, planMaker, null); @@ -153,7 +153,7 @@ public abstract class BaseQueriesTest { * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with * different index segments in the test and overriding getDistinctInstances. * This can be particularly useful to test statistical aggregation functions. - * @see StatisticalQueriesTest for an example use case. + * @see CovarianceQueriesTest for an example use case. */ protected BrokerResponseNative getBrokerResponse(String query, @Nullable Map<String, String> extraQueryOptions) { return getBrokerResponse(query, PLAN_MAKER, extraQueryOptions); @@ -167,7 +167,7 @@ public abstract class BaseQueriesTest { * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with * different index segments in the test and overriding getDistinctInstances. * This can be particularly useful to test statistical aggregation functions. - * @see StatisticalQueriesTest for an example use case. + * @see CovarianceQueriesTest for an example use case. */ private BrokerResponseNative getBrokerResponse(String query, PlanMaker planMaker, @Nullable Map<String, String> extraQueryOptions) { @@ -191,7 +191,7 @@ public abstract class BaseQueriesTest { * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with * different index segments in the test and overriding getDistinctInstances. * This can be particularly useful to test statistical aggregation functions. - * @see StatisticalQueriesTest for an example use case. + * @see CovarianceQueriesTest for an example use case. */ private BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery, PlanMaker planMaker) { PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery); @@ -249,7 +249,7 @@ public abstract class BaseQueriesTest { * In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with * different index segments in the test and overriding getDistinctInstances. * This can be particularly useful to test statistical aggregation functions. - * @see StatisticalQueriesTest for an example use case. + * @see CovarianceQueriesTest for an example use case. */ protected BrokerResponseNative getBrokerResponseForOptimizedQuery(String query, @Nullable TableConfig config, @Nullable Schema schema) { @@ -286,7 +286,7 @@ public abstract class BaseQueriesTest { * The caller of this function should handle initializing 2 instances with different index segments in the test and * overriding getDistinctInstances. * This can be particularly useful to test statistical aggregation functions. - * @see StatisticalQueriesTest for an example use case. + * @see CovarianceQueriesTest for an example use case. */ private BrokerResponseNative getBrokerResponseDistinctInstances(PinotQuery pinotQuery, PlanMaker planMaker) { PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java new file mode 100644 index 0000000000..0ab86180e8 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.queries; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.commons.math3.stat.correlation.Covariance; +import org.apache.commons.math3.util.Precision; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.query.AggregationOperator; +import org.apache.pinot.core.operator.query.GroupByOperator; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.segment.local.customobject.CovarianceTuple; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Queries test for covariance queries. + */ +public class CovarianceQueriesTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "CovarianceQueriesTest"); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String SEGMENT_NAME = "testSegment"; + + // test segments 1-4 evenly divide testSegment into 4 distinct segments + private static final String SEGMENT_NAME_1 = "testSegment1"; + private static final String SEGMENT_NAME_2 = "testSegment2"; + private static final String SEGMENT_NAME_3 = "testSegment3"; + private static final String SEGMENT_NAME_4 = "testSegment4"; + + private static final int NUM_RECORDS = 2000; + private static final int NUM_GROUPS = 10; + private static final int MAX_VALUE = 500; + private static final double RELATIVE_EPSILON = 0.0001; + private static final double DELTA = 0.0001; + + private static final String INT_COLUMN_X = "intColumnX"; + private static final String INT_COLUMN_Y = "intColumnY"; + private static final String DOUBLE_COLUMN_X = "doubleColumnX"; + private static final String DOUBLE_COLUMN_Y = "doubleColumnY"; + private static final String LONG_COLUMN = "longColumn"; + private static final String FLOAT_COLUMN = "floatColumn"; + private static final String GROUP_BY_COLUMN = "groupByColumn"; + + private static final Schema SCHEMA = + new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN_X, FieldSpec.DataType.INT) + .addSingleValueDimension(INT_COLUMN_Y, FieldSpec.DataType.INT) + .addSingleValueDimension(DOUBLE_COLUMN_X, FieldSpec.DataType.DOUBLE) + .addSingleValueDimension(DOUBLE_COLUMN_Y, FieldSpec.DataType.DOUBLE) + .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG) + .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT) + .addSingleValueDimension(GROUP_BY_COLUMN, FieldSpec.DataType.DOUBLE).build(); + private static final TableConfig TABLE_CONFIG = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + + private IndexSegment _indexSegment; + private List<IndexSegment> _indexSegments; + private List<List<IndexSegment>> _distinctInstances; + private int _sumIntX = 0; + private int _sumIntY = 0; + private int _sumIntXY = 0; + + private double _sumDoubleX = 0; + private double _sumDoubleY = 0; + private double _sumDoubleXY = 0; + + private long _sumLong = 0L; + private double _sumFloat = 0; + + private double _sumIntDouble = 0; + private long _sumIntLong = 0L; + private double _sumIntFloat = 0; + private double _sumDoubleLong = 0; + private double _sumDoubleFloat = 0; + private double _sumLongFloat = 0; + + private double _expectedCovIntXY; + private double _expectedCovDoubleXY; + private double _expectedCovIntDouble; + private double _expectedCovIntLong; + private double _expectedCovIntFloat; + private double _expectedCovDoubleLong; + private double _expectedCovDoubleFloat; + private double _expectedCovLongFloat; + + private double _expectedCovWithFilter; + + private final CovarianceTuple[] _expectedGroupByResultVer1 = new CovarianceTuple[NUM_GROUPS]; + private final CovarianceTuple[] _expectedGroupByResultVer2 = new CovarianceTuple[NUM_GROUPS]; + private final double[] _expectedFinalResultVer1 = new double[NUM_GROUPS]; + private final double[] _expectedFinalResultVer2 = new double[NUM_GROUPS]; + + private boolean _useIdenticalSegment = false; + + @Override + protected String getFilter() { + // filter out half of the rows based on group id + return " WHERE groupByColumn < " + (NUM_GROUPS / 2); + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return _indexSegments; + } + + @Override + protected List<List<IndexSegment>> getDistinctInstances() { + if (_useIdenticalSegment) { + return Collections.singletonList(_indexSegments); + } + return _distinctInstances; + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(INDEX_DIR); + + List<GenericRow> records = new ArrayList<>(NUM_RECORDS); + + Random rand = new Random(); + int[] intColX = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); + int[] intColY = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); + double[] doubleColX = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); + double[] doubleColY = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); + long[] longCol = rand.longs(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); + double[] floatCol = new double[NUM_RECORDS]; + double[] groupByCol = new double[NUM_RECORDS]; + + int groupSize = NUM_RECORDS / NUM_GROUPS; + double sumX = 0; + double sumY = 0; + double sumGroupBy = 0; + double sumXY = 0; + double sumXGroupBy = 0; + int groupByVal = 0; + + for (int i = 0; i < NUM_RECORDS; i++) { + GenericRow record = new GenericRow(); + int intX = intColX[i]; + int intY = intColY[i]; + double doubleX = doubleColX[i]; + double doubleY = doubleColY[i]; + long longVal = longCol[i]; + float floatVal = -MAX_VALUE + rand.nextFloat() * 2 * MAX_VALUE; + + // set up inner segment group by results + groupByVal = (int) Math.floor(i / groupSize); + if (i % groupSize == 0 && groupByVal > 0) { + _expectedGroupByResultVer1[groupByVal - 1] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize); + _expectedGroupByResultVer2[groupByVal - 1] = new CovarianceTuple(sumX, sumY, sumXY, groupSize); + sumX = 0; + sumY = 0; + sumGroupBy = 0; + sumXY = 0; + sumXGroupBy = 0; + } + + sumX += doubleX; + sumY += doubleY; + sumGroupBy += groupByVal; + sumXY += doubleX * doubleY; + sumXGroupBy += doubleX * groupByVal; + + floatCol[i] = floatVal; + groupByCol[i] = groupByVal; + + // calculate inner segment results + _sumIntX += intX; + _sumIntY += intY; + _sumDoubleX += doubleX; + _sumDoubleY += doubleY; + _sumLong += longVal; + _sumFloat += floatVal; + _sumIntXY += intX * intY; + _sumDoubleXY += doubleX * doubleY; + _sumIntDouble += intX * doubleX; + _sumIntLong += intX * longVal; + _sumIntFloat += intX * floatCol[i]; + _sumDoubleLong += doubleX * longVal; + _sumDoubleFloat += doubleX * floatCol[i]; + _sumLongFloat += longVal * floatCol[i]; + + record.putValue(INT_COLUMN_X, intX); + record.putValue(INT_COLUMN_Y, intY); + record.putValue(DOUBLE_COLUMN_X, doubleX); + record.putValue(DOUBLE_COLUMN_Y, doubleY); + record.putValue(LONG_COLUMN, longVal); + record.putValue(FLOAT_COLUMN, floatVal); + record.putValue(GROUP_BY_COLUMN, groupByVal); + records.add(record); + } + _expectedGroupByResultVer1[groupByVal] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize); + _expectedGroupByResultVer2[groupByVal] = new CovarianceTuple(sumX, sumY, sumXY, groupSize); + + // calculate inter segment result + Covariance cov = new Covariance(); + double[] newIntColX = Arrays.stream(intColX).asDoubleStream().toArray(); + double[] newIntColY = Arrays.stream(intColY).asDoubleStream().toArray(); + double[] newLongCol = Arrays.stream(longCol).asDoubleStream().toArray(); + _expectedCovIntXY = cov.covariance(newIntColX, newIntColY, false); + _expectedCovDoubleXY = cov.covariance(doubleColX, doubleColY, false); + _expectedCovIntDouble = cov.covariance(newIntColX, doubleColX, false); + _expectedCovIntLong = cov.covariance(newIntColX, newLongCol, false); + _expectedCovIntFloat = cov.covariance(newIntColX, floatCol, false); + _expectedCovDoubleLong = cov.covariance(doubleColX, newLongCol, false); + _expectedCovDoubleFloat = cov.covariance(doubleColX, floatCol, false); + _expectedCovLongFloat = cov.covariance(newLongCol, floatCol, false); + + double[] filteredX = Arrays.copyOfRange(doubleColX, 0, NUM_RECORDS / 2); + double[] filteredY = Arrays.copyOfRange(doubleColY, 0, NUM_RECORDS / 2); + _expectedCovWithFilter = cov.covariance(filteredX, filteredY, false); + + // calculate inter segment group by results + for (int i = 0; i < NUM_GROUPS; i++) { + double[] colX = Arrays.copyOfRange(doubleColX, i * groupSize, (i + 1) * groupSize); + double[] colGroupBy = Arrays.copyOfRange(groupByCol, i * groupSize, (i + 1) * groupSize); + double[] colY = Arrays.copyOfRange(doubleColY, i * groupSize, (i + 1) * groupSize); + _expectedFinalResultVer1[i] = cov.covariance(colX, colGroupBy, false); + _expectedFinalResultVer2[i] = cov.covariance(colX, colY, false); + } + + // generate testSegment + ImmutableSegment immutableSegment = setUpSingleSegment(records, SEGMENT_NAME); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + + // divide testSegment into 4 distinct segments for distinct inter segment tests + // by doing so, we can avoid calculating global covariance again + _distinctInstances = new ArrayList<>(); + int segmentSize = NUM_RECORDS / 4; + ImmutableSegment immutableSegment1 = setUpSingleSegment(records.subList(0, segmentSize), SEGMENT_NAME_1); + ImmutableSegment immutableSegment2 = + setUpSingleSegment(records.subList(segmentSize, segmentSize * 2), SEGMENT_NAME_2); + ImmutableSegment immutableSegment3 = + setUpSingleSegment(records.subList(segmentSize * 2, segmentSize * 3), SEGMENT_NAME_3); + ImmutableSegment immutableSegment4 = + setUpSingleSegment(records.subList(segmentSize * 3, NUM_RECORDS), SEGMENT_NAME_4); + // generate 2 instances each with 2 distinct segments + _distinctInstances.add(Arrays.asList(immutableSegment1, immutableSegment2)); + _distinctInstances.add(Arrays.asList(immutableSegment3, immutableSegment4)); + } + + private ImmutableSegment setUpSingleSegment(List<GenericRow> recordSet, String segmentName) + throws Exception { + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); + segmentGeneratorConfig.setSegmentName(segmentName); + segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig, new GenericRowRecordReader(recordSet)); + driver.build(); + + return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap); + } + + @Test + public void testAggregationOnly() { + // Inner Segment + String query = + "SELECT COVAR_POP(intColumnX, intColumnY), COVAR_POP(doubleColumnX, doubleColumnY), COVAR_POP(intColumnX, " + + "doubleColumnX), " + "COVAR_POP(intColumnX, longColumn), COVAR_POP(intColumnX, floatColumn), " + + "COVAR_POP(doubleColumnX, longColumn), COVAR_POP(doubleColumnX, floatColumn), COVAR_POP(longColumn, " + + "floatColumn) FROM testTable"; + AggregationOperator aggregationOperator = getOperator(query); + AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0, + NUM_RECORDS * 6, NUM_RECORDS); + List<Object> aggregationResult = resultsBlock.getResults(); + assertNotNull(aggregationResult); + checkWithPrecision((CovarianceTuple) aggregationResult.get(0), _sumIntX, _sumIntY, _sumIntXY, NUM_RECORDS); + checkWithPrecision((CovarianceTuple) aggregationResult.get(1), _sumDoubleX, _sumDoubleY, _sumDoubleXY, NUM_RECORDS); + checkWithPrecision((CovarianceTuple) aggregationResult.get(2), _sumIntX, _sumDoubleX, _sumIntDouble, NUM_RECORDS); + checkWithPrecision((CovarianceTuple) aggregationResult.get(3), _sumIntX, _sumLong, _sumIntLong, NUM_RECORDS); + checkWithPrecision((CovarianceTuple) aggregationResult.get(4), _sumIntX, _sumFloat, _sumIntFloat, NUM_RECORDS); + checkWithPrecision((CovarianceTuple) aggregationResult.get(5), _sumDoubleX, _sumLong, _sumDoubleLong, NUM_RECORDS); + checkWithPrecision((CovarianceTuple) aggregationResult.get(6), _sumDoubleX, _sumFloat, _sumDoubleFloat, + NUM_RECORDS); + checkWithPrecision((CovarianceTuple) aggregationResult.get(7), _sumLong, _sumFloat, _sumLongFloat, NUM_RECORDS); + + // Inter segments with 4 identical segments (2 instances each having 2 identical segments) + _useIdenticalSegment = true; + BrokerResponseNative brokerResponse = getBrokerResponse(query); + _useIdenticalSegment = false; + assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); + assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 6 * NUM_RECORDS); + assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + checkResultTableWithPrecision(brokerResponse); + + // Inter segments with 4 distinct segments (2 instances each having 2 distinct segments) + brokerResponse = getBrokerResponse(query); + assertEquals(brokerResponse.getNumDocsScanned(), NUM_RECORDS); + assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 6 * NUM_RECORDS); + assertEquals(brokerResponse.getTotalDocs(), NUM_RECORDS); + checkResultTableWithPrecision(brokerResponse); + + // Inter segments with 4 identical segments with filter + _useIdenticalSegment = true; + query = "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable" + getFilter(); + brokerResponse = getBrokerResponse(query); + _useIdenticalSegment = false; + assertEquals(brokerResponse.getNumDocsScanned(), 2 * NUM_RECORDS); + assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * NUM_RECORDS); + assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + Object[] results = brokerResponse.getResultTable().getRows().get(0); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovWithFilter, RELATIVE_EPSILON)); + } + + @Test + public void testAggregationGroupBy() { + + // Inner Segment + // case 1: (col1, groupByCol) group by groupByCol => all covariances are 0's + String query = + "SELECT COVAR_POP(doubleColumnX, groupByColumn) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn"; + GroupByOperator groupByOperator = getOperator(query); + GroupByResultsBlock resultsBlock = groupByOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, + NUM_RECORDS * 2, NUM_RECORDS); + AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + assertNotNull(aggregationGroupByResult); + for (int i = 0; i < NUM_GROUPS; i++) { + CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i); + CovarianceTuple expectedCovTuple = _expectedGroupByResultVer1[i]; + checkWithPrecision(actualCovTuple, expectedCovTuple); + } + + // Inter Segment with 4 identical segments + _useIdenticalSegment = true; + BrokerResponseNative brokerResponse = getBrokerResponse(query); + checkGroupByResults(brokerResponse, _expectedFinalResultVer1); + _useIdenticalSegment = false; + // Inter Segment with 4 distinct segments + brokerResponse = getBrokerResponse(query); + checkGroupByResults(brokerResponse, _expectedFinalResultVer1); + + // Inner Segment + // case 2: COVAR_POP(col1, col2) group by groupByCol => nondeterministic cov + query = + "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn"; + groupByOperator = getOperator(query); + resultsBlock = groupByOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, + NUM_RECORDS * 3, NUM_RECORDS); + aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + assertNotNull(aggregationGroupByResult); + + for (int i = 0; i < NUM_GROUPS; i++) { + CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i); + CovarianceTuple expectedCovTuple = _expectedGroupByResultVer2[i]; + checkWithPrecision(actualCovTuple, expectedCovTuple); + } + + // Inter Segment with 4 identical segments + _useIdenticalSegment = true; + brokerResponse = getBrokerResponse(query); + checkGroupByResults(brokerResponse, _expectedFinalResultVer2); + _useIdenticalSegment = false; + // Inter Segment with 4 distinct segments + brokerResponse = getBrokerResponse(query); + checkGroupByResults(brokerResponse, _expectedFinalResultVer2); + } + + private void checkWithPrecision(CovarianceTuple tuple, double sumX, double sumY, double sumXY, int count) { + assertEquals(tuple.getCount(), count); + assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumX(), sumX, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumY(), sumY, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumXY(), sumXY, RELATIVE_EPSILON)); + } + + private void checkWithPrecision(CovarianceTuple actual, CovarianceTuple expected) { + checkWithPrecision(actual, expected.getSumX(), expected.getSumY(), expected.getSumXY(), (int) expected.getCount()); + } + + private void checkResultTableWithPrecision(BrokerResponseNative brokerResponse) { + Object[] results = brokerResponse.getResultTable().getRows().get(0); + assertEquals(results.length, 8); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovIntXY, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[1], _expectedCovDoubleXY, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[2], _expectedCovIntDouble, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[3], _expectedCovIntLong, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[4], _expectedCovIntFloat, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[5], _expectedCovDoubleLong, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[6], _expectedCovDoubleFloat, RELATIVE_EPSILON)); + assertTrue(Precision.equalsWithRelativeTolerance((double) results[7], _expectedCovLongFloat, RELATIVE_EPSILON)); + } + + private void checkGroupByResults(BrokerResponseNative brokerResponse, double[] expectedResults) { + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + for (int i = 0; i < NUM_GROUPS; i++) { + assertTrue(Precision.equals((double) rows.get(i)[0], expectedResults[i], DELTA)); + } + } + + @AfterClass + public void tearDown() + throws IOException { + _indexSegment.destroy(); + for (List<IndexSegment> indexList : _distinctInstances) { + for (IndexSegment seg : indexList) { + seg.destroy(); + } + } + FileUtils.deleteDirectory(INDEX_DIR); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java deleted file mode 100644 index d61dcaa18f..0000000000 --- a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java +++ /dev/null @@ -1,749 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pinot.queries; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import org.apache.commons.io.FileUtils; -import org.apache.commons.math3.stat.correlation.Covariance; -import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; -import org.apache.commons.math3.stat.descriptive.moment.Variance; -import org.apache.commons.math3.util.Precision; -import org.apache.pinot.common.response.broker.BrokerResponseNative; -import org.apache.pinot.common.response.broker.ResultTable; -import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; -import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; -import org.apache.pinot.core.operator.query.AggregationOperator; -import org.apache.pinot.core.operator.query.GroupByOperator; -import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; -import org.apache.pinot.segment.local.customobject.CovarianceTuple; -import org.apache.pinot.segment.local.customobject.VarianceTuple; -import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; -import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; -import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; -import org.apache.pinot.segment.spi.ImmutableSegment; -import org.apache.pinot.segment.spi.IndexSegment; -import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.utils.ReadMode; -import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - - -/** - * Queries test for statistical queries (i.e Variance, Covariance, Standard Deviation etc) - */ -public class StatisticalQueriesTest extends BaseQueriesTest { - private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "CovarianceQueriesTest"); - private static final String RAW_TABLE_NAME = "testTable"; - private static final String SEGMENT_NAME = "testSegment"; - - // test segments 1-4 evenly divide testSegment into 4 distinct segments - private static final String SEGMENT_NAME_1 = "testSegment1"; - private static final String SEGMENT_NAME_2 = "testSegment2"; - private static final String SEGMENT_NAME_3 = "testSegment3"; - private static final String SEGMENT_NAME_4 = "testSegment4"; - - private static final int NUM_RECORDS = 2000; - private static final int NUM_GROUPS = 10; - private static final int MAX_VALUE = 500; - private static final double RELATIVE_EPSILON = 0.0001; - private static final double DELTA = 0.0001; - - private static final String INT_COLUMN_X = "intColumnX"; - private static final String INT_COLUMN_Y = "intColumnY"; - private static final String DOUBLE_COLUMN_X = "doubleColumnX"; - private static final String DOUBLE_COLUMN_Y = "doubleColumnY"; - private static final String LONG_COLUMN = "longColumn"; - private static final String FLOAT_COLUMN = "floatColumn"; - private static final String GROUP_BY_COLUMN = "groupByColumn"; - - private static final Schema SCHEMA = - new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN_X, FieldSpec.DataType.INT) - .addSingleValueDimension(INT_COLUMN_Y, FieldSpec.DataType.INT) - .addSingleValueDimension(DOUBLE_COLUMN_X, FieldSpec.DataType.DOUBLE) - .addSingleValueDimension(DOUBLE_COLUMN_Y, FieldSpec.DataType.DOUBLE) - .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG) - .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT) - .addSingleValueDimension(GROUP_BY_COLUMN, FieldSpec.DataType.DOUBLE).build(); - private static final TableConfig TABLE_CONFIG = - new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); - - private IndexSegment _indexSegment; - private List<IndexSegment> _indexSegments; - private List<List<IndexSegment>> _distinctInstances; - private int _sumIntX = 0; - private int _sumIntY = 0; - private int _sumIntXY = 0; - - private double _sumDoubleX = 0; - private double _sumDoubleY = 0; - private double _sumDoubleXY = 0; - - private long _sumLong = 0L; - private double _sumFloat = 0; - - private double _sumIntDouble = 0; - private long _sumIntLong = 0L; - private double _sumIntFloat = 0; - private double _sumDoubleLong = 0; - private double _sumDoubleFloat = 0; - private double _sumLongFloat = 0; - - private double _expectedCovIntXY; - private double _expectedCovDoubleXY; - private double _expectedCovIntDouble; - private double _expectedCovIntLong; - private double _expectedCovIntFloat; - private double _expectedCovDoubleLong; - private double _expectedCovDoubleFloat; - private double _expectedCovLongFloat; - - private double _expectedCovWithFilter; - - private final CovarianceTuple[] _expectedGroupByResultVer1 = new CovarianceTuple[NUM_GROUPS]; - private final CovarianceTuple[] _expectedGroupByResultVer2 = new CovarianceTuple[NUM_GROUPS]; - private final double[] _expectedFinalResultVer1 = new double[NUM_GROUPS]; - private final double[] _expectedFinalResultVer2 = new double[NUM_GROUPS]; - - private boolean _useIdenticalSegment = false; - - int[] _intColX = new int[NUM_RECORDS]; - int[] _intColY = new int[NUM_RECORDS]; - long[] _longCol = new long[NUM_RECORDS]; - double[] _floatCol = new double[NUM_RECORDS]; - double[] _doubleColX = new double[NUM_RECORDS]; - double[] _doubleColY = new double[NUM_RECORDS]; - double[] _groupByCol = new double[NUM_RECORDS]; - - @Override - protected String getFilter() { - // filter out half of the rows based on group id - return " WHERE groupByColumn < " + (NUM_GROUPS / 2); - } - - @Override - protected IndexSegment getIndexSegment() { - return _indexSegment; - } - - @Override - protected List<IndexSegment> getIndexSegments() { - return _indexSegments; - } - - @Override - protected List<List<IndexSegment>> getDistinctInstances() { - if (_useIdenticalSegment) { - return Collections.singletonList(_indexSegments); - } - return _distinctInstances; - } - - @BeforeClass - public void setUp() - throws Exception { - FileUtils.deleteDirectory(INDEX_DIR); - - List<GenericRow> records = new ArrayList<>(NUM_RECORDS); - - Random rand = new Random(); - _intColX = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); - _intColY = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); - _doubleColX = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); - _doubleColY = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); - _longCol = rand.longs(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray(); - - int groupSize = NUM_RECORDS / NUM_GROUPS; - double sumX = 0; - double sumY = 0; - double sumGroupBy = 0; - double sumXY = 0; - double sumXGroupBy = 0; - int groupByVal = 0; - - for (int i = 0; i < NUM_RECORDS; i++) { - GenericRow record = new GenericRow(); - int intX = _intColX[i]; - int intY = _intColY[i]; - double doubleX = _doubleColX[i]; - double doubleY = _doubleColY[i]; - long longVal = _longCol[i]; - float floatVal = -MAX_VALUE + rand.nextFloat() * 2 * MAX_VALUE; - - // set up inner segment group by results - groupByVal = (int) Math.floor(i / groupSize); - if (i % groupSize == 0 && groupByVal > 0) { - _expectedGroupByResultVer1[groupByVal - 1] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize); - _expectedGroupByResultVer2[groupByVal - 1] = new CovarianceTuple(sumX, sumY, sumXY, groupSize); - sumX = 0; - sumY = 0; - sumGroupBy = 0; - sumXY = 0; - sumXGroupBy = 0; - } - - sumX += doubleX; - sumY += doubleY; - sumGroupBy += groupByVal; - sumXY += doubleX * doubleY; - sumXGroupBy += doubleX * groupByVal; - - _floatCol[i] = floatVal; - _groupByCol[i] = groupByVal; - - // calculate inner segment results - _sumIntX += intX; - _sumIntY += intY; - _sumDoubleX += doubleX; - _sumDoubleY += doubleY; - _sumLong += longVal; - _sumFloat += floatVal; - _sumIntXY += intX * intY; - _sumDoubleXY += doubleX * doubleY; - _sumIntDouble += intX * doubleX; - _sumIntLong += intX * longVal; - _sumIntFloat += intX * _floatCol[i]; - _sumDoubleLong += doubleX * longVal; - _sumDoubleFloat += doubleX * _floatCol[i]; - _sumLongFloat += longVal * _floatCol[i]; - - record.putValue(INT_COLUMN_X, intX); - record.putValue(INT_COLUMN_Y, intY); - record.putValue(DOUBLE_COLUMN_X, doubleX); - record.putValue(DOUBLE_COLUMN_Y, doubleY); - record.putValue(LONG_COLUMN, longVal); - record.putValue(FLOAT_COLUMN, floatVal); - record.putValue(GROUP_BY_COLUMN, groupByVal); - records.add(record); - } - _expectedGroupByResultVer1[groupByVal] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize); - _expectedGroupByResultVer2[groupByVal] = new CovarianceTuple(sumX, sumY, sumXY, groupSize); - - // calculate inter segment result - Covariance cov = new Covariance(); - double[] newIntColX = Arrays.stream(_intColX).asDoubleStream().toArray(); - double[] newIntColY = Arrays.stream(_intColY).asDoubleStream().toArray(); - double[] newLongCol = Arrays.stream(_longCol).asDoubleStream().toArray(); - _expectedCovIntXY = cov.covariance(newIntColX, newIntColY, false); - _expectedCovDoubleXY = cov.covariance(_doubleColX, _doubleColY, false); - _expectedCovIntDouble = cov.covariance(newIntColX, _doubleColX, false); - _expectedCovIntLong = cov.covariance(newIntColX, newLongCol, false); - _expectedCovIntFloat = cov.covariance(newIntColX, _floatCol, false); - _expectedCovDoubleLong = cov.covariance(_doubleColX, newLongCol, false); - _expectedCovDoubleFloat = cov.covariance(_doubleColX, _floatCol, false); - _expectedCovLongFloat = cov.covariance(newLongCol, _floatCol, false); - - double[] filteredX = Arrays.copyOfRange(_doubleColX, 0, NUM_RECORDS / 2); - double[] filteredY = Arrays.copyOfRange(_doubleColY, 0, NUM_RECORDS / 2); - _expectedCovWithFilter = cov.covariance(filteredX, filteredY, false); - - // calculate inter segment group by results - for (int i = 0; i < NUM_GROUPS; i++) { - double[] colX = Arrays.copyOfRange(_doubleColX, i * groupSize, (i + 1) * groupSize); - double[] colGroupBy = Arrays.copyOfRange(_groupByCol, i * groupSize, (i + 1) * groupSize); - double[] colY = Arrays.copyOfRange(_doubleColY, i * groupSize, (i + 1) * groupSize); - _expectedFinalResultVer1[i] = cov.covariance(colX, colGroupBy, false); - _expectedFinalResultVer2[i] = cov.covariance(colX, colY, false); - } - - // generate testSegment - ImmutableSegment immutableSegment = setUpSingleSegment(records, SEGMENT_NAME); - _indexSegment = immutableSegment; - _indexSegments = Arrays.asList(immutableSegment, immutableSegment); - - // divide testSegment into 4 distinct segments for distinct inter segment tests - // by doing so, we can avoid calculating global covariance again - _distinctInstances = new ArrayList<>(); - int segmentSize = NUM_RECORDS / 4; - ImmutableSegment immutableSegment1 = setUpSingleSegment(records.subList(0, segmentSize), SEGMENT_NAME_1); - ImmutableSegment immutableSegment2 = - setUpSingleSegment(records.subList(segmentSize, segmentSize * 2), SEGMENT_NAME_2); - ImmutableSegment immutableSegment3 = - setUpSingleSegment(records.subList(segmentSize * 2, segmentSize * 3), SEGMENT_NAME_3); - ImmutableSegment immutableSegment4 = - setUpSingleSegment(records.subList(segmentSize * 3, NUM_RECORDS), SEGMENT_NAME_4); - // generate 2 instances each with 2 distinct segments - _distinctInstances.add(Arrays.asList(immutableSegment1, immutableSegment2)); - _distinctInstances.add(Arrays.asList(immutableSegment3, immutableSegment4)); - } - - private ImmutableSegment setUpSingleSegment(List<GenericRow> recordSet, String segmentName) - throws Exception { - SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); - segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); - segmentGeneratorConfig.setSegmentName(segmentName); - segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath()); - - SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); - driver.init(segmentGeneratorConfig, new GenericRowRecordReader(recordSet)); - driver.build(); - - return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap); - } - - @Test - public void testCovarianceAggregationOnly() { - // Inner Segment - String query = - "SELECT COVAR_POP(intColumnX, intColumnY), COVAR_POP(doubleColumnX, doubleColumnY), COVAR_POP(intColumnX, " - + "doubleColumnX), " + "COVAR_POP(intColumnX, longColumn), COVAR_POP(intColumnX, floatColumn), " - + "COVAR_POP(doubleColumnX, longColumn), COVAR_POP(doubleColumnX, floatColumn), COVAR_POP(longColumn, " - + "floatColumn) FROM testTable"; - AggregationOperator aggregationOperator = getOperator(query); - AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); - QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0, - NUM_RECORDS * 6, NUM_RECORDS); - List<Object> aggregationResult = resultsBlock.getResults(); - assertNotNull(aggregationResult); - checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(0), _sumIntX, _sumIntY, _sumIntXY, - NUM_RECORDS); - checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(1), _sumDoubleX, _sumDoubleY, _sumDoubleXY, - NUM_RECORDS); - checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(2), _sumIntX, _sumDoubleX, _sumIntDouble, - NUM_RECORDS); - checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(3), _sumIntX, _sumLong, _sumIntLong, - NUM_RECORDS); - checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(4), _sumIntX, _sumFloat, _sumIntFloat, - NUM_RECORDS); - checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(5), _sumDoubleX, _sumLong, _sumDoubleLong, - NUM_RECORDS); - checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(6), _sumDoubleX, _sumFloat, _sumDoubleFloat, - NUM_RECORDS); - checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(7), _sumLong, _sumFloat, _sumLongFloat, - NUM_RECORDS); - - // Inter segments with 4 identical segments (2 instances each having 2 identical segments) - _useIdenticalSegment = true; - BrokerResponseNative brokerResponse = getBrokerResponse(query); - _useIdenticalSegment = false; - assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); - assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); - assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 6 * NUM_RECORDS); - assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); - checkResultTableWithPrecisionForCovariance(brokerResponse); - - // Inter segments with 4 distinct segments (2 instances each having 2 distinct segments) - brokerResponse = getBrokerResponse(query); - assertEquals(brokerResponse.getNumDocsScanned(), NUM_RECORDS); - assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); - assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 6 * NUM_RECORDS); - assertEquals(brokerResponse.getTotalDocs(), NUM_RECORDS); - checkResultTableWithPrecisionForCovariance(brokerResponse); - - // Inter segments with 4 identical segments with filter - _useIdenticalSegment = true; - query = "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable" + getFilter(); - brokerResponse = getBrokerResponse(query); - _useIdenticalSegment = false; - assertEquals(brokerResponse.getNumDocsScanned(), 2 * NUM_RECORDS); - assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); - assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * NUM_RECORDS); - assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); - Object[] results = brokerResponse.getResultTable().getRows().get(0); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovWithFilter, RELATIVE_EPSILON)); - } - - @Test - public void testCovarianceAggregationGroupBy() { - // Inner Segment - // case 1: (col1, groupByCol) group by groupByCol => all covariances are 0's - String query = - "SELECT COVAR_POP(doubleColumnX, groupByColumn) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn"; - GroupByOperator groupByOperator = getOperator(query); - GroupByResultsBlock resultsBlock = groupByOperator.nextBlock(); - QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, - NUM_RECORDS * 2, NUM_RECORDS); - AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); - assertNotNull(aggregationGroupByResult); - for (int i = 0; i < NUM_GROUPS; i++) { - CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i); - CovarianceTuple expectedCovTuple = _expectedGroupByResultVer1[i]; - checkWithPrecisionForCovariance(actualCovTuple, expectedCovTuple); - } - - // Inter Segment with 4 identical segments - _useIdenticalSegment = true; - BrokerResponseNative brokerResponse = getBrokerResponse(query); - checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer1); - _useIdenticalSegment = false; - // Inter Segment with 4 distinct segments - brokerResponse = getBrokerResponse(query); - checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer1); - - // Inner Segment - // case 2: COVAR_POP(col1, col2) group by groupByCol => nondeterministic cov - query = - "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn"; - groupByOperator = getOperator(query); - resultsBlock = groupByOperator.nextBlock(); - QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, - NUM_RECORDS * 3, NUM_RECORDS); - aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); - assertNotNull(aggregationGroupByResult); - - for (int i = 0; i < NUM_GROUPS; i++) { - CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i); - CovarianceTuple expectedCovTuple = _expectedGroupByResultVer2[i]; - checkWithPrecisionForCovariance(actualCovTuple, expectedCovTuple); - } - - // Inter Segment with 4 identical segments - _useIdenticalSegment = true; - brokerResponse = getBrokerResponse(query); - checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer2); - _useIdenticalSegment = false; - // Inter Segment with 4 distinct segments - brokerResponse = getBrokerResponse(query); - checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer2); - } - - @Test - public void testVarianceAggregationOnly() { - // Compute the expected values - Variance[] expectedVariances = new Variance[8]; - for (int i = 0; i < 8; i++) { - if (i < 4) { - expectedVariances[i] = new Variance(false); - } else { - expectedVariances[i] = new Variance(true); - } - } - for (int i = 0; i < NUM_RECORDS; i++) { - expectedVariances[0].increment(_intColX[i]); - expectedVariances[1].increment(_longCol[i]); - expectedVariances[2].increment(_floatCol[i]); - expectedVariances[3].increment(_doubleColX[i]); - expectedVariances[4].increment(_intColX[i]); - expectedVariances[5].increment(_longCol[i]); - expectedVariances[6].increment(_floatCol[i]); - expectedVariances[7].increment(_doubleColX[i]); - } - double expectedIntSum = Arrays.stream(_intColX).asDoubleStream().sum(); - double expectedLongSum = Arrays.stream(_longCol).asDoubleStream().sum(); - double expectedFloatSum = 0.0; - for (int i = 0; i < _floatCol.length; i++) { - expectedFloatSum += _floatCol[i]; - } - double expectedDoubleSum = Arrays.stream(_doubleColX).sum(); - - // Compute the query - String query = "SELECT VAR_POP(intColumnX), VAR_POP(longColumn), VAR_POP(floatColumn), VAR_POP(doubleColumnX)," - + "VAR_SAMP(intColumnX), VAR_SAMP(longColumn), VAR_SAMP(floatColumn), VAR_SAMP(doubleColumnX) FROM testTable"; - AggregationOperator aggregationOperator = getOperator(query); - AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); - QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0, - NUM_RECORDS * 4, NUM_RECORDS); - List<Object> aggregationResult = resultsBlock.getResults(); - - // Validate the aggregation results - checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(0), NUM_RECORDS, expectedIntSum, - expectedVariances[0].getResult(), false); - checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(1), NUM_RECORDS, expectedLongSum, - expectedVariances[1].getResult(), false); - checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(2), NUM_RECORDS, expectedFloatSum, - expectedVariances[2].getResult(), false); - checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(3), NUM_RECORDS, expectedDoubleSum, - expectedVariances[3].getResult(), false); - checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(4), NUM_RECORDS, expectedIntSum, - expectedVariances[4].getResult(), true); - checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(5), NUM_RECORDS, expectedLongSum, - expectedVariances[5].getResult(), true); - checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(6), NUM_RECORDS, expectedFloatSum, - expectedVariances[6].getResult(), true); - checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(7), NUM_RECORDS, expectedDoubleSum, - expectedVariances[7].getResult(), true); - - // Validate the response - BrokerResponseNative brokerResponse = getBrokerResponse(query); - Object[] results = brokerResponse.getResultTable().getRows().get(0); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[0], expectedVariances[0].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[1], expectedVariances[1].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[2], expectedVariances[2].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[3], expectedVariances[3].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[4], expectedVariances[4].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[5], expectedVariances[5].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[6], expectedVariances[6].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[7], expectedVariances[7].getResult(), RELATIVE_EPSILON)); - - VarianceTuple test = ((VarianceTuple) aggregationResult.get(0)); - test.apply((new VarianceTuple(0, 0, 0.0d))); - System.out.println(test.getM2()); - // Validate the response for a query with a filter - query = "SELECT VAR_POP(intColumnX) from testTable" + getFilter(); - brokerResponse = getBrokerResponse(query); - brokerResponse.getResultTable(); - results = brokerResponse.getResultTable().getRows().get(0); - Variance filterExpectedVariance = new Variance(false); - for (int i = 0; i < NUM_RECORDS / 2; i++) { - filterExpectedVariance.increment(_intColX[i]); - } - assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], filterExpectedVariance.getResult(), - RELATIVE_EPSILON)); - } - - @Test - public void testVarianceAggregationGroupBy() { - // Compute expected group results - Variance[] expectedGroupByResult = new Variance[NUM_GROUPS]; - double[] expectedSum = new double[NUM_GROUPS]; - - for (int i = 0; i < NUM_GROUPS; i++) { - expectedGroupByResult[i] = new Variance(false); - } - for (int j = 0; j < NUM_RECORDS; j++) { - int pos = j / (NUM_RECORDS / NUM_GROUPS); - expectedGroupByResult[pos].increment(_intColX[j]); - expectedSum[pos] += _intColX[j]; - } - - String query = "SELECT VAR_POP(intColumnX) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn"; - GroupByOperator groupByOperator = getOperator(query); - GroupByResultsBlock resultsBlock = groupByOperator.nextBlock(); - QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, - NUM_RECORDS * 2, NUM_RECORDS); - AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); - assertNotNull(aggregationGroupByResult); - for (int i = 0; i < NUM_GROUPS; i++) { - - VarianceTuple actualVarianceTuple = (VarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i); - checkWithPrecisionForVariance(actualVarianceTuple, NUM_RECORDS / NUM_GROUPS, expectedSum[i], - expectedGroupByResult[i].getResult(), false); - } - } - - @Test - public void testStandardDeviationAggregationOnly() { - // Compute the expected values - StandardDeviation[] expectedStdDevs = new StandardDeviation[8]; - for (int i = 0; i < 8; i++) { - if (i < 4) { - expectedStdDevs[i] = new StandardDeviation(false); - } else { - expectedStdDevs[i] = new StandardDeviation(true); - } - } - for (int i = 0; i < NUM_RECORDS; i++) { - expectedStdDevs[0].increment(_intColX[i]); - expectedStdDevs[1].increment(_longCol[i]); - expectedStdDevs[2].increment(_floatCol[i]); - expectedStdDevs[3].increment(_doubleColX[i]); - expectedStdDevs[4].increment(_intColX[i]); - expectedStdDevs[5].increment(_longCol[i]); - expectedStdDevs[6].increment(_floatCol[i]); - expectedStdDevs[7].increment(_doubleColX[i]); - } - - double expectedIntSum = Arrays.stream(_intColX).asDoubleStream().sum(); - double expectedLongSum = Arrays.stream(_longCol).asDoubleStream().sum(); - double expectedFloatSum = 0.0; - for (int i = 0; i < _floatCol.length; i++) { - expectedFloatSum += _floatCol[i]; - } - double expectedDoubleSum = Arrays.stream(_doubleColX).sum(); - - // Compute the query - String query = - "SELECT STDDEV_POP(intColumnX), STDDEV_POP(longColumn), STDDEV_POP(floatColumn), STDDEV_POP(doubleColumnX)," - + "STDDEV_SAMP(intColumnX), STDDEV_SAMP(longColumn), STDDEV_SAMP(floatColumn), STDDEV_SAMP(doubleColumnX) " - + "FROM testTable"; - AggregationOperator aggregationOperator = getOperator(query); - AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); - QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0, - NUM_RECORDS * 4, NUM_RECORDS); - List<Object> aggregationResult = resultsBlock.getResults(); - - // Validate the aggregation results - checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(0), NUM_RECORDS, expectedIntSum, - expectedStdDevs[0].getResult(), false); - checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(1), NUM_RECORDS, expectedLongSum, - expectedStdDevs[1].getResult(), false); - checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(2), NUM_RECORDS, expectedFloatSum, - expectedStdDevs[2].getResult(), false); - checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(3), NUM_RECORDS, expectedDoubleSum, - expectedStdDevs[3].getResult(), false); - checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(4), NUM_RECORDS, expectedIntSum, - expectedStdDevs[4].getResult(), true); - checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(5), NUM_RECORDS, expectedLongSum, - expectedStdDevs[5].getResult(), true); - checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(6), NUM_RECORDS, expectedFloatSum, - expectedStdDevs[6].getResult(), true); - checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(7), NUM_RECORDS, expectedDoubleSum, - expectedStdDevs[7].getResult(), true); - - // Validate the response - BrokerResponseNative brokerResponse = getBrokerResponse(query); - brokerResponse.getResultTable(); - Object[] results = brokerResponse.getResultTable().getRows().get(0); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[0], expectedStdDevs[0].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[1], expectedStdDevs[1].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[2], expectedStdDevs[2].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[3], expectedStdDevs[3].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[4], expectedStdDevs[4].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[5], expectedStdDevs[5].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[6], expectedStdDevs[6].getResult(), RELATIVE_EPSILON)); - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[7], expectedStdDevs[7].getResult(), RELATIVE_EPSILON)); - - // Validate the response for a query with a filter - query = "SELECT STDDEV_POP(intColumnX) from testTable" + getFilter(); - brokerResponse = getBrokerResponse(query); - brokerResponse.getResultTable(); - results = brokerResponse.getResultTable().getRows().get(0); - StandardDeviation filterExpectedStdDev = new StandardDeviation(false); - for (int i = 0; i < NUM_RECORDS / 2; i++) { - filterExpectedStdDev.increment(_intColX[i]); - } - assertTrue( - Precision.equalsWithRelativeTolerance((double) results[0], filterExpectedStdDev.getResult(), RELATIVE_EPSILON)); - } - - @Test - public void testStandardDeviationAggreagtionGroupBy() { - // Compute expected group results - StandardDeviation[] expectedGroupByResult = new StandardDeviation[NUM_GROUPS]; - double[] expectedSum = new double[NUM_GROUPS]; - - for (int i = 0; i < NUM_GROUPS; i++) { - expectedGroupByResult[i] = new StandardDeviation(false); - } - for (int j = 0; j < NUM_RECORDS; j++) { - int pos = j / (NUM_RECORDS / NUM_GROUPS); - expectedGroupByResult[pos].increment(_intColX[j]); - expectedSum[pos] += _intColX[j]; - } - - String query = "SELECT STDDEV_POP(intColumnX) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn"; - GroupByOperator groupByOperator = getOperator(query); - GroupByResultsBlock resultsBlock = groupByOperator.nextBlock(); - QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, - NUM_RECORDS * 2, NUM_RECORDS); - AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); - assertNotNull(aggregationGroupByResult); - for (int i = 0; i < NUM_GROUPS; i++) { - VarianceTuple actualVarianceTuple = (VarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i); - checkWithPrecisionForStandardDeviation(actualVarianceTuple, NUM_RECORDS / NUM_GROUPS, expectedSum[i], - expectedGroupByResult[i].getResult(), false); - } - } - - private void checkWithPrecisionForCovariance(CovarianceTuple tuple, double sumX, double sumY, double sumXY, - int count) { - assertEquals(tuple.getCount(), count); - assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumX(), sumX, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumY(), sumY, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumXY(), sumXY, RELATIVE_EPSILON)); - } - - private void checkWithPrecisionForCovariance(CovarianceTuple actual, CovarianceTuple expected) { - checkWithPrecisionForCovariance(actual, expected.getSumX(), expected.getSumY(), expected.getSumXY(), - (int) expected.getCount()); - } - - private void checkResultTableWithPrecisionForCovariance(BrokerResponseNative brokerResponse) { - Object[] results = brokerResponse.getResultTable().getRows().get(0); - assertEquals(results.length, 8); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovIntXY, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[1], _expectedCovDoubleXY, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[2], _expectedCovIntDouble, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[3], _expectedCovIntLong, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[4], _expectedCovIntFloat, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[5], _expectedCovDoubleLong, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[6], _expectedCovDoubleFloat, RELATIVE_EPSILON)); - assertTrue(Precision.equalsWithRelativeTolerance((double) results[7], _expectedCovLongFloat, RELATIVE_EPSILON)); - } - - private void checkGroupByResultsForCovariance(BrokerResponseNative brokerResponse, double[] expectedResults) { - ResultTable resultTable = brokerResponse.getResultTable(); - List<Object[]> rows = resultTable.getRows(); - for (int i = 0; i < NUM_GROUPS; i++) { - assertTrue(Precision.equals((double) rows.get(i)[0], expectedResults[i], DELTA)); - } - } - - private void checkWithPrecisionForVariance(VarianceTuple tuple, int expectedCount, double expectedSum, - double expectedVariance, boolean isBiasCorrected) { - assertEquals(tuple.getCount(), expectedCount); - assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSum(), expectedSum, RELATIVE_EPSILON)); - if (!isBiasCorrected) { - assertTrue( - Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedVariance * expectedCount, RELATIVE_EPSILON)); - } else { - assertTrue(Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedVariance * (expectedCount - 1), - RELATIVE_EPSILON)); - } - } - - private void checkWithPrecisionForStandardDeviation(VarianceTuple tuple, int expectedCount, double expectedSum, - double expectedStdDev, boolean isBiasCorrected) { - assertEquals(tuple.getCount(), expectedCount); - assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSum(), expectedSum, RELATIVE_EPSILON)); - if (!isBiasCorrected) { - assertTrue(Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedStdDev * expectedStdDev * expectedCount, - RELATIVE_EPSILON)); - } else { - assertTrue( - Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedStdDev * expectedStdDev * (expectedCount - 1), - RELATIVE_EPSILON)); - } - } - - @AfterClass - public void tearDown() - throws IOException { - _indexSegment.destroy(); - for (List<IndexSegment> indexList : _distinctInstances) { - for (IndexSegment seg : indexList) { - seg.destroy(); - } - } - FileUtils.deleteDirectory(INDEX_DIR); - } -} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java deleted file mode 100644 index 09594364b4..0000000000 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.segment.local.customobject; - -import java.nio.ByteBuffer; - - -public class VarianceTuple implements Comparable<VarianceTuple> { - private long _count; - private double _sum; - private double _m2; - - public VarianceTuple(long count, double sum, double m2) { - _count = count; - _sum = sum; - _m2 = m2; - } - - public void apply(long count, double sum, double m2) { - if (count == 0) { - return; - } - double delta = (sum / count) - (_sum / _count); - _m2 += m2 + delta * delta * count * _count / (count + _count); - _count += count; - _sum += sum; - } - - public void apply(VarianceTuple varianceTuple) { - if (varianceTuple._count == 0) { - return; - } - double delta = (varianceTuple._sum / varianceTuple._count) - (_sum / _count); - _m2 += varianceTuple._m2 + delta * delta * varianceTuple._count * _count / (varianceTuple._count + _count); - _count += varianceTuple._count; - _sum += varianceTuple._sum; - } - - public long getCount() { - return _count; - } - - public double getSum() { - return _sum; - } - - public double getM2() { - return _m2; - } - - public byte[] toBytes() { - ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES * 2 + Long.BYTES); - byteBuffer.putLong(_count); - byteBuffer.putDouble(_sum); - byteBuffer.putDouble(_m2); - return byteBuffer.array(); - } - - public static VarianceTuple fromBytes(byte[] bytes) { - return fromByteBuffer(ByteBuffer.wrap(bytes)); - } - - public static VarianceTuple fromByteBuffer(ByteBuffer byteBuffer) { - return new VarianceTuple(byteBuffer.getLong(), byteBuffer.getDouble(), byteBuffer.getDouble()); - } - - @Override - public int compareTo(VarianceTuple varianceTuple) { - if (_count == 0) { - if (varianceTuple._count == 0) { - return 0; - } else { - return -1; - } - } else { - if (varianceTuple._count == 0) { - return 1; - } else { - if (_m2 > varianceTuple._m2) { - return 1; - } - if (_m2 < varianceTuple._m2) { - return -1; - } - return 0; - } - } - } -} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 584c99a7b2..19d2f4d6e5 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -59,10 +59,6 @@ public enum AggregationFunctionType { HISTOGRAM("histogram"), COVARPOP("covarPop"), COVARSAMP("covarSamp"), - VARPOP("varPop"), - VARSAMP("varSamp"), - STDDEVPOP("stdDevPop"), - STDDEVSAMP("stdDevSamp"), // Geo aggregation functions STUNION("STUnion"), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
