This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6303fc9940 support SKEW_POP and KURTOSIS_POP aggregates (#10021)
6303fc9940 is described below
commit 6303fc9940b77358d259607e8328a13c5577773a
Author: Almog Gavra <[email protected]>
AuthorDate: Thu Dec 22 18:05:06 2022 -0800
support SKEW_POP and KURTOSIS_POP aggregates (#10021)
* support SKEW_POP and KURTOSIS_POP aggregates
* address feedback
* address feedback 2
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 27 +++-
.../function/AggregationFunctionFactory.java | 4 +
.../function/FourthMomentAggregationFunction.java | 166 +++++++++++++++++++
.../function/AggregationFunctionFactoryTest.java | 14 ++
.../pinot/queries/StatisticalQueriesTest.java | 179 +++++++++++++++++++++
.../local/customobject/PinotFourthMoment.java | 135 ++++++++++++++++
.../local/customobject/PinotFourthMomentTest.java | 102 ++++++++++++
.../pinot/segment/spi/AggregationFunctionType.java | 2 +
8 files changed, 627 insertions(+), 2 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 6fbacc5fcd..fbfee474e9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -70,6 +70,7 @@ import
org.apache.pinot.segment.local.customobject.FloatLongPair;
import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
@@ -125,7 +126,8 @@ public class ObjectSerDeUtils {
DoubleLongPair(30),
StringLongPair(31),
CovarianceTuple(32),
- VarianceTuple(33);
+ VarianceTuple(33),
+ PinotFourthMoment(34);
private final int _value;
@@ -209,6 +211,8 @@ public class ObjectSerDeUtils {
return ObjectType.CovarianceTuple;
} else if (value instanceof VarianceTuple) {
return ObjectType.VarianceTuple;
+ } else if (value instanceof PinotFourthMoment) {
+ return ObjectType.PinotFourthMoment;
} else {
throw new IllegalArgumentException("Unsupported type of value: " +
value.getClass().getSimpleName());
}
@@ -483,6 +487,24 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<PinotFourthMoment>
PINOT_FOURTH_MOMENT_OBJECT_SER_DE
+ = new ObjectSerDe<PinotFourthMoment>() {
+ @Override
+ public byte[] serialize(PinotFourthMoment value) {
+ return value.serialize();
+ }
+
+ @Override
+ public PinotFourthMoment deserialize(byte[] bytes) {
+ return PinotFourthMoment.fromBytes(bytes);
+ }
+
+ @Override
+ public PinotFourthMoment deserialize(ByteBuffer byteBuffer) {
+ return PinotFourthMoment.fromBytes(byteBuffer);
+ }
+ };
+
public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new
ObjectSerDe<HyperLogLog>() {
@Override
@@ -1213,7 +1235,8 @@ public class ObjectSerDeUtils {
DOUBLE_LONG_PAIR_SER_DE,
STRING_LONG_PAIR_SER_DE,
COVARIANCE_TUPLE_OBJECT_SER_DE,
- VARIANCE_TUPLE_OBJECT_SER_DE
+ VARIANCE_TUPLE_OBJECT_SER_DE,
+ PINOT_FOURTH_MOMENT_OBJECT_SER_DE
};
//@formatter:on
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 2758d193a2..4ba3413221 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -285,6 +285,10 @@ public class AggregationFunctionFactory {
return new VarianceAggregationFunction(firstArgument, false, true);
case STDDEVSAMP:
return new VarianceAggregationFunction(firstArgument, true, true);
+ case SKEWNESS:
+ return new FourthMomentAggregationFunction(firstArgument,
FourthMomentAggregationFunction.Type.SKEWNESS);
+ case KURTOSIS:
+ return new FourthMomentAggregationFunction(firstArgument,
FourthMomentAggregationFunction.Type.KURTOSIS);
default:
throw new IllegalArgumentException();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
new file mode 100644
index 0000000000..9cb06e4eeb
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class FourthMomentAggregationFunction extends
BaseSingleInputAggregationFunction<PinotFourthMoment, Double> {
+
+ private final Type _type;
+
+ enum Type {
+ KURTOSIS, SKEWNESS
+ }
+
+ public FourthMomentAggregationFunction(ExpressionContext expression, Type
type) {
+ super(expression);
+ _type = type;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ switch (_type) {
+ case KURTOSIS:
+ return AggregationFunctionType.KURTOSIS;
+ case SKEWNESS:
+ return AggregationFunctionType.SKEWNESS;
+ default:
+ throw new IllegalArgumentException("Unexpected type " + _type);
+ }
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ double[] values =
StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
+
+ PinotFourthMoment m4 = aggregationResultHolder.getResult();
+ if (m4 == null) {
+ m4 = new PinotFourthMoment();
+ aggregationResultHolder.setValue(m4);
+ }
+
+ for (int i = 0; i < length; i++) {
+ m4.increment(values[i]);
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ double[] values =
StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
+ for (int i = 0; i < length; i++) {
+ PinotFourthMoment m4 = groupByResultHolder.getResult(groupKeyArray[i]);
+ if (m4 == null) {
+ m4 = new PinotFourthMoment();
+ groupByResultHolder.setValueForKey(groupKeyArray[i], m4);
+ }
+ m4.increment(values[i]);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ double[] values =
StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey);
+ if (m4 == null) {
+ m4 = new PinotFourthMoment();
+ groupByResultHolder.setValueForKey(groupKey, m4);
+ }
+ m4.increment(values[i]);
+ }
+ }
+ }
+
+ @Override
+ public PinotFourthMoment extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ PinotFourthMoment m4 = aggregationResultHolder.getResult();
+ if (m4 == null) {
+ return new PinotFourthMoment();
+ } else {
+ return m4;
+ }
+ }
+
+ @Override
+ public PinotFourthMoment extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey);
+ if (m4 == null) {
+ return new PinotFourthMoment();
+ } else {
+ return m4;
+ }
+ }
+
+ @Override
+ public PinotFourthMoment merge(PinotFourthMoment intermediateResult1,
PinotFourthMoment intermediateResult2) {
+ intermediateResult1.combine(intermediateResult2);
+ return intermediateResult1;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.DOUBLE;
+ }
+
+ @Override
+ public Double extractFinalResult(PinotFourthMoment m4) {
+ if (m4 == null) {
+ return null;
+ }
+
+ switch (_type) {
+ case KURTOSIS:
+ return m4.kurtosis();
+ case SKEWNESS:
+ return m4.skew();
+ default:
+ throw new IllegalStateException("Unexpected value: " + _type);
+ }
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index 144949a0d5..a29694ef79 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -458,6 +458,20 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getType(),
AggregationFunctionType.BOOLOR);
assertEquals(aggregationFunction.getColumnName(), "boolOr_column");
assertEquals(aggregationFunction.getResultColumnName(), "boolor(column)");
+
+ function = getFunction("skewness");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.SKEWNESS);
+ assertEquals(aggregationFunction.getColumnName(), "skewness_column");
+ assertEquals(aggregationFunction.getResultColumnName(),
"skewness(column)");
+
+ function = getFunction("kurtosis");
+ aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(function,
DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction);
+ assertEquals(aggregationFunction.getType(),
AggregationFunctionType.KURTOSIS);
+ assertEquals(aggregationFunction.getColumnName(), "kurtosis_column");
+ assertEquals(aggregationFunction.getResultColumnName(),
"kurtosis(column)");
}
private FunctionContext getFunction(String functionName) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
index 05defe18f0..9365006b35 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
@@ -28,6 +28,8 @@ import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.commons.math3.stat.correlation.Covariance;
+import org.apache.commons.math3.stat.descriptive.moment.Kurtosis;
+import org.apache.commons.math3.stat.descriptive.moment.Skewness;
import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
import org.apache.commons.math3.stat.descriptive.moment.Variance;
import org.apache.commons.math3.util.Precision;
@@ -39,6 +41,7 @@ import
org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.GroupByOperator;
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.segment.local.customobject.CovarianceTuple;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -705,6 +708,172 @@ public class StatisticalQueriesTest extends
BaseQueriesTest {
}
}
+ @Test
+ public void testSkewAggregationOnly() {
+ // Compute the expected values
+ Skewness[] expectedSkew = new Skewness[4];
+ for (int i = 0; i < 4; i++) {
+ expectedSkew[i] = new Skewness();
+ }
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ expectedSkew[0].increment(_intColX[i]);
+ expectedSkew[1].increment(_longCol[i]);
+ expectedSkew[2].increment(_floatCol[i]);
+ expectedSkew[3].increment(_doubleColX[i]);
+ }
+
+ // Compute the query
+ String query =
+ "SELECT SKEWNESS(intColumnX), SKEWNESS(longColumn),
SKEWNESS(floatColumn), SKEWNESS(doubleColumnX) "
+ + "FROM testTable";
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ NUM_RECORDS * 4, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getResults();
+
+ // Validate the aggregation results
+ checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(0),
NUM_RECORDS, expectedSkew[0].getResult());
+ checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(1),
NUM_RECORDS, expectedSkew[1].getResult());
+ checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(2),
NUM_RECORDS, expectedSkew[2].getResult());
+ checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(3),
NUM_RECORDS, expectedSkew[3].getResult());
+
+ // Validate the response
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ brokerResponse.getResultTable();
+ Object[] results = brokerResponse.getResultTable().getRows().get(0);
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[0],
expectedSkew[0].getResult(), RELATIVE_EPSILON));
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[1],
expectedSkew[1].getResult(), RELATIVE_EPSILON));
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[2],
expectedSkew[2].getResult(), RELATIVE_EPSILON));
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[3],
expectedSkew[3].getResult(), RELATIVE_EPSILON));
+
+ // Validate the response for a query with a filter
+ query = "SELECT SKEWNESS(intColumnX) from testTable" + getFilter();
+ brokerResponse = getBrokerResponse(query);
+ brokerResponse.getResultTable();
+ results = brokerResponse.getResultTable().getRows().get(0);
+ Skewness filterExpectedSkew = new Skewness();
+ for (int i = 0; i < NUM_RECORDS / 2; i++) {
+ filterExpectedSkew.increment(_intColX[i]);
+ }
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[0],
filterExpectedSkew.getResult(), RELATIVE_EPSILON));
+ }
+
+ @Test
+ public void testSkewAggregationGroupBy() {
+ // Compute expected group results
+ Skewness[] expectedGroupByResult = new Skewness[NUM_GROUPS];
+
+ for (int i = 0; i < NUM_GROUPS; i++) {
+ expectedGroupByResult[i] = new Skewness();
+ }
+ for (int j = 0; j < NUM_RECORDS; j++) {
+ int pos = j / (NUM_RECORDS / NUM_GROUPS);
+ expectedGroupByResult[pos].increment(_intColX[j]);
+ }
+
+ String query = "SELECT SKEWNESS(intColumnX) FROM testTable GROUP BY
groupByColumn ORDER BY groupByColumn";
+ GroupByOperator groupByOperator = getOperator(query);
+ GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ NUM_RECORDS * 2, NUM_RECORDS);
+ AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+ for (int i = 0; i < NUM_GROUPS; i++) {
+ PinotFourthMoment actual = (PinotFourthMoment)
aggregationGroupByResult.getResultForGroupId(0, i);
+ checkWithPrecisionForSkew(actual, NUM_RECORDS / NUM_GROUPS,
expectedGroupByResult[i].getResult());
+ }
+ }
+
+ @Test
+ public void testKurtosisAggregationOnly() {
+ // Compute the expected values
+ Kurtosis[] expectedKurt = new Kurtosis[4];
+ for (int i = 0; i < 4; i++) {
+ expectedKurt[i] = new Kurtosis();
+ }
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ expectedKurt[0].increment(_intColX[i]);
+ expectedKurt[1].increment(_longCol[i]);
+ expectedKurt[2].increment(_floatCol[i]);
+ expectedKurt[3].increment(_doubleColX[i]);
+ }
+
+ // Compute the query
+ String query =
+ "SELECT KURTOSIS(intColumnX), KURTOSIS(longColumn),
KURTOSIS(floatColumn), "
+ + "KURTOSIS(doubleColumnX) FROM testTable";
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ NUM_RECORDS * 4, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getResults();
+
+ // Validate the aggregation results
+ checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(0),
NUM_RECORDS, expectedKurt[0].getResult());
+ checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(1),
NUM_RECORDS, expectedKurt[1].getResult());
+ checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(2),
NUM_RECORDS, expectedKurt[2].getResult());
+ checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(3),
NUM_RECORDS, expectedKurt[3].getResult());
+
+ // Validate the response
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ brokerResponse.getResultTable();
+ Object[] results = brokerResponse.getResultTable().getRows().get(0);
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[0],
expectedKurt[0].getResult(), RELATIVE_EPSILON));
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[1],
expectedKurt[1].getResult(), RELATIVE_EPSILON));
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[2],
expectedKurt[2].getResult(), RELATIVE_EPSILON));
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[3],
expectedKurt[3].getResult(), RELATIVE_EPSILON));
+
+ // Validate the response for a query with a filter
+ query = "SELECT KURTOSIS(intColumnX) from testTable" + getFilter();
+ brokerResponse = getBrokerResponse(query);
+ brokerResponse.getResultTable();
+ results = brokerResponse.getResultTable().getRows().get(0);
+ Kurtosis filterExpectedKurt = new Kurtosis();
+ for (int i = 0; i < NUM_RECORDS / 2; i++) {
+ filterExpectedKurt.increment(_intColX[i]);
+ }
+ assertTrue(
+ Precision.equalsWithRelativeTolerance((double) results[0],
filterExpectedKurt.getResult(), RELATIVE_EPSILON));
+ }
+
+ @Test
+ public void testKurtosisAggregationGroupBy() {
+ // Compute expected group results
+ Kurtosis[] expectedGroupByResult = new Kurtosis[NUM_GROUPS];
+
+ for (int i = 0; i < NUM_GROUPS; i++) {
+ expectedGroupByResult[i] = new Kurtosis();
+ }
+ for (int j = 0; j < NUM_RECORDS; j++) {
+ int pos = j / (NUM_RECORDS / NUM_GROUPS);
+ expectedGroupByResult[pos].increment(_intColX[j]);
+ }
+
+ String query = "SELECT KURTOSIS(intColumnX) FROM testTable GROUP BY
groupByColumn ORDER BY groupByColumn";
+ GroupByOperator groupByOperator = getOperator(query);
+ GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(),
NUM_RECORDS, 0,
+ NUM_RECORDS * 2, NUM_RECORDS);
+ AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+ for (int i = 0; i < NUM_GROUPS; i++) {
+ PinotFourthMoment actual = (PinotFourthMoment)
aggregationGroupByResult.getResultForGroupId(0, i);
+ checkWithPrecisionForKurt(actual, NUM_RECORDS / NUM_GROUPS,
expectedGroupByResult[i].getResult());
+ }
+ }
+
private void checkWithPrecisionForCovariance(CovarianceTuple tuple, double
sumX, double sumY, double sumXY,
int count) {
assertEquals(tuple.getCount(), count);
@@ -766,6 +935,16 @@ public class StatisticalQueriesTest extends
BaseQueriesTest {
}
}
+ private void checkWithPrecisionForSkew(PinotFourthMoment m4, int
expectedCount, double expectedSkew) {
+ assertEquals(m4.getN(), expectedCount);
+ assertTrue(Precision.equalsWithRelativeTolerance(m4.skew(), expectedSkew,
RELATIVE_EPSILON));
+ }
+
+ private void checkWithPrecisionForKurt(PinotFourthMoment m4, int
expectedCount, double expectedSkew) {
+ assertEquals(m4.getN(), expectedCount);
+ assertTrue(Precision.equalsWithRelativeTolerance(m4.kurtosis(),
expectedSkew, RELATIVE_EPSILON));
+ }
+
private double computeVariancePop(VarianceTuple varianceTuple) {
return varianceTuple.getM2() / varianceTuple.getCount();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java
new file mode 100644
index 0000000000..f5ebb46a76
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import org.apache.commons.math.stat.descriptive.moment.FourthMoment;
+import org.apache.commons.math.stat.descriptive.moment.Kurtosis;
+import org.apache.commons.math.stat.descriptive.moment.Skewness;
+
+
+/**
+ * A {@link Comparable} implementation of the <a
href=https://en.wikipedia.org/wiki/Moment_(mathematics)>
+ * Fourth Statistical Moment</a> that uses the apache commons algorithm for
computing it in
+ * one pass. It additionally supports serialization and deserialization
methods, which is helpful
+ * for combining moments across servers.
+ *
+ * <p>The commons implementation does not support parallel-computation,
support for which is added
+ * in the {@link #combine(PinotFourthMoment)} method inspired by Presto's
implementation.
+ *
+ * <pre>
+ * Also See: <a
href="https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook
+ * /presto/operator/aggregation/AggregationUtils.java#L188">Presto's
Implementation</a>
+ * </pre>
+ */
+public class PinotFourthMoment extends FourthMoment implements
Comparable<PinotFourthMoment> {
+
+ private static final Comparator<PinotFourthMoment> COMPARATOR =
Comparator.<PinotFourthMoment>naturalOrder()
+ .thenComparingLong(x -> x.n)
+ .thenComparingDouble(x -> x.m1)
+ .thenComparingDouble(x -> x.m2)
+ .thenComparingDouble(x -> x.m3)
+ .thenComparingDouble(x -> x.m4);
+
+ public void combine(PinotFourthMoment other) {
+ combine(other.n, other.m1, other.m2, other.m3, other.m4);
+ }
+
+ public void combine(long bN, double bM1, double bM2, double bM3, double bM4)
{
+ if (bN == 0) {
+ return;
+ } else if (n == 0) {
+ n = bN;
+ m1 = bM1;
+ m2 = bM2;
+ m3 = bM3;
+ m4 = bM4;
+ return;
+ }
+
+ long aN = n;
+ double aM1 = m1;
+ double aM2 = m2;
+ double aM3 = m3;
+ double aM4 = m4;
+
+ long n = aN + bN;
+ double m1 = (aN * aM1 + bN * bM1) / n;
+
+ double delta = bM1 - aM1;
+ double delta2 = delta * delta;
+ double m2 = aM2 + bM2 + delta2 * aN * bN / n;
+
+ double delta3 = delta2 * delta;
+ double m3 = aM3 + bM3
+ + delta3 * aN * bN * (aN - bN) / (n * n)
+ + 3d * delta * (aN * bM2 - bN * aM2) / n;
+
+ double delta4 = delta3 * delta;
+ double n3 = ((double) n) * n * n; // avoid overflow
+ double m4 = aM4 + bM4
+ + delta4 * aN * bN * (aN * aN - aN * bN + bN * bN) / (n3)
+ + 6.0 * delta2 * (aN * aN * bM2 + bN * bN * aM2) / (n * n)
+ + 4d * delta * (aN * bM3 - bN * aM3) / n;
+
+ this.n = n;
+ this.m1 = m1;
+ this.m2 = m2;
+ this.m3 = m3;
+ this.m4 = m4;
+ }
+
+ public double skew() {
+ return new Skewness(this).getResult();
+ }
+
+ public double kurtosis() {
+ return new Kurtosis(this).getResult();
+ }
+
+ public byte[] serialize() {
+ ByteBuffer buff = ByteBuffer.allocate(Long.BYTES + Double.BYTES * 4);
+ buff.putLong(n)
+ .putDouble(m1)
+ .putDouble(m2)
+ .putDouble(m3)
+ .putDouble(m4);
+ return buff.array();
+ }
+
+ public static PinotFourthMoment fromBytes(byte[] bytes) {
+ return fromBytes(ByteBuffer.wrap(bytes));
+ }
+
+ public static PinotFourthMoment fromBytes(ByteBuffer buff) {
+ PinotFourthMoment moment = new PinotFourthMoment();
+ moment.n = buff.getLong();
+ moment.m1 = buff.getDouble();
+ moment.m2 = buff.getDouble();
+ moment.m3 = buff.getDouble();
+ moment.m4 = buff.getDouble();
+ return moment;
+ }
+
+ @Override
+ public int compareTo(PinotFourthMoment o) {
+ return COMPARATOR.compare(this, o);
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java
new file mode 100644
index 0000000000..727bea7d87
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Random;
+import java.util.stream.IntStream;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class PinotFourthMomentTest {
+
+ @Test
+ public void shouldCombineMoments() {
+ // Given:
+ Random r = new Random();
+ double[] xs = IntStream.generate(r::nextInt)
+ .limit(100)
+ .mapToDouble(i -> (double) i)
+ .toArray();
+
+ PinotFourthMoment a = new PinotFourthMoment();
+ PinotFourthMoment b = new PinotFourthMoment();
+ PinotFourthMoment c = new PinotFourthMoment();
+
+ // When:
+ for (int i = 0; i < xs.length; i++) {
+ a.increment(xs[i]);
+ (i < xs.length / 2 ? b : c).increment(xs[i]);
+ }
+ b.combine(c);
+
+ // Then:
+ assertEquals(b.skew(), a.skew(), .01);
+ assertEquals(b.kurtosis(), a.kurtosis(), .01);
+ }
+
+ @Test
+ public void shouldCombineLeftEmptyMoments() {
+ // Given:
+ Random r = new Random();
+ double[] xs = IntStream.generate(r::nextInt)
+ .limit(100)
+ .mapToDouble(i -> (double) i)
+ .toArray();
+
+ PinotFourthMoment a = new PinotFourthMoment();
+ PinotFourthMoment b = new PinotFourthMoment();
+
+ // When:
+ for (double x : xs) {
+ a.increment(x);
+ }
+
+ b.combine(a);
+
+ // Then:
+ assertEquals(b.kurtosis(), a.kurtosis(), .01);
+ }
+
+ @Test
+ public void shouldCombineRightEmptyMoments() {
+ // Given:
+ Random r = new Random();
+ double[] xs = IntStream.generate(r::nextInt)
+ .limit(100)
+ .mapToDouble(i -> (double) i)
+ .toArray();
+
+ PinotFourthMoment a = new PinotFourthMoment();
+ PinotFourthMoment b = new PinotFourthMoment();
+
+ // When:
+ for (double x : xs) {
+ a.increment(x);
+ }
+
+ double kurtosisBeforeCombine = a.kurtosis();
+ a.combine(b);
+
+ // Then:
+ assertEquals(a.kurtosis(), kurtosisBeforeCombine, .01);
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 584c99a7b2..beaac5718d 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -63,6 +63,8 @@ public enum AggregationFunctionType {
VARSAMP("varSamp"),
STDDEVPOP("stdDevPop"),
STDDEVSAMP("stdDevSamp"),
+ SKEWNESS("skewness"),
+ KURTOSIS("kurtosis"),
// Geo aggregation functions
STUNION("STUnion"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]