This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d12ce887a9 Minmaxrange null (#12252)
d12ce887a9 is described below
commit d12ce887a9adf5e4d6253ada533e9f0a5df71298
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Mar 19 02:06:54 2024 -0700
Minmaxrange null (#12252)
* new test framework candidate
* Improved test system
* Improve framework to be able to specify segments as strings
* fix headers
* Improve assertions when there are nulls
* Improve error text
* Improvements in the framework
* Add a base class single input aggregation operations can extend to
support null handling
* Fix issue in NullableSingleInputAggregationFunction.forEachNotNullInt
* Improve error message in NullEnabledQueriesTest
* Add new schema family
* Rename test schemas and table config
* Split AllNullQueriesTest into on test per query
* Revert change in AllNullQueriesTest that belongs to mode-null-support
branch
* Add null support in minmaxrange
* Adapted to new framework
* Applied suggestions during PR
---
.../function/AggregationFunctionFactory.java | 2 +-
.../function/MinMaxRangeAggregationFunction.java | 103 +++++------
.../function/MinMaxRangeMVAggregationFunction.java | 2 +-
.../MinMaxRangeAggregationFunctionTest.java | 195 +++++++++++++++++++++
.../aggregator/MinMaxRangeValueAggregator.java | 2 +-
.../local/customobject/MinMaxRangePair.java | 8 +
6 files changed, 259 insertions(+), 53 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 8db0d730d7..3c449f1578 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -316,7 +316,7 @@ public class AggregationFunctionFactory {
}
}
case MINMAXRANGE:
- return new MinMaxRangeAggregationFunction(arguments);
+ return new MinMaxRangeAggregationFunction(arguments,
nullHandlingEnabled);
case DISTINCTCOUNT:
return new DistinctCountAggregationFunction(arguments,
nullHandlingEnabled);
case DISTINCTCOUNTBITMAP:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
index 1c039b9d14..28299429c6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
@@ -33,14 +33,14 @@ import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
-public class MinMaxRangeAggregationFunction extends
BaseSingleInputAggregationFunction<MinMaxRangePair, Double> {
+public class MinMaxRangeAggregationFunction extends
NullableSingleInputAggregationFunction<MinMaxRangePair, Double> {
- public MinMaxRangeAggregationFunction(List<ExpressionContext> arguments) {
- super(verifySingleArgument(arguments, "MIN_MAX_RANGE"));
+ public MinMaxRangeAggregationFunction(List<ExpressionContext> arguments,
boolean nullHandlingEnabled) {
+ super(verifySingleArgument(arguments, "MIN_MAX_RANGE"),
nullHandlingEnabled);
}
- protected MinMaxRangeAggregationFunction(ExpressionContext expression) {
- super(expression);
+ protected MinMaxRangeAggregationFunction(ExpressionContext expression,
boolean nullHandlingEnabled) {
+ super(expression, nullHandlingEnabled);
}
@Override
@@ -61,37 +61,29 @@ public class MinMaxRangeAggregationFunction extends
BaseSingleInputAggregationFu
@Override
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- double min = Double.POSITIVE_INFINITY;
- double max = Double.NEGATIVE_INFINITY;
BlockValSet blockValSet = blockValSetMap.get(_expression);
+ MinMaxRangePair minMax = new MinMaxRangePair();
+
if (blockValSet.getValueType() != DataType.BYTES) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
- for (int i = 0; i < length; i++) {
- double value = doubleValues[i];
- if (value < min) {
- min = value;
- }
- if (value > max) {
- max = value;
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ double value = doubleValues[i];
+ minMax.apply(value);
}
- }
+ });
} else {
// Serialized MinMaxRangePair
byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- MinMaxRangePair minMaxRangePair =
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
- double minValue = minMaxRangePair.getMin();
- double maxValue = minMaxRangePair.getMax();
- if (minValue < min) {
- min = minValue;
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ MinMaxRangePair minMaxRangePair =
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+ minMax.apply(minMaxRangePair);
}
- if (maxValue > max) {
- max = maxValue;
- }
- }
+ });
}
- setAggregationResult(aggregationResultHolder, min, max);
+ setAggregationResult(aggregationResultHolder, minMax.getMin(),
minMax.getMax());
}
protected void setAggregationResult(AggregationResultHolder
aggregationResultHolder, double min, double max) {
@@ -109,17 +101,21 @@ public class MinMaxRangeAggregationFunction extends
BaseSingleInputAggregationFu
BlockValSet blockValSet = blockValSetMap.get(_expression);
if (blockValSet.getValueType() != DataType.BYTES) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
- for (int i = 0; i < length; i++) {
- double value = doubleValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, value, value);
- }
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ double value = doubleValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, value,
value);
+ }
+ });
} else {
// Serialized MinMaxRangePair
byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- MinMaxRangePair minMaxRangePair =
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
- setGroupByResult(groupKeyArray[i], groupByResultHolder,
minMaxRangePair.getMin(), minMaxRangePair.getMax());
- }
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ MinMaxRangePair minMaxRangePair =
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+ setGroupByResult(groupKeyArray[i], groupByResultHolder,
minMaxRangePair.getMin(), minMaxRangePair.getMax());
+ }
+ });
}
}
@@ -129,23 +125,27 @@ public class MinMaxRangeAggregationFunction extends
BaseSingleInputAggregationFu
BlockValSet blockValSet = blockValSetMap.get(_expression);
if (blockValSet.getValueType() != DataType.BYTES) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
- for (int i = 0; i < length; i++) {
- double value = doubleValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, value);
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ double value = doubleValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, value);
+ }
}
- }
+ });
} else {
// Serialized MinMaxRangePair
byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- MinMaxRangePair minMaxRangePair =
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
- double min = minMaxRangePair.getMin();
- double max = minMaxRangePair.getMax();
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, min, max);
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ MinMaxRangePair minMaxRangePair =
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+ double min = minMaxRangePair.getMin();
+ double max = minMaxRangePair.getMax();
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, min, max);
+ }
}
- }
+ });
}
}
@@ -161,8 +161,8 @@ public class MinMaxRangeAggregationFunction extends
BaseSingleInputAggregationFu
@Override
public MinMaxRangePair extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
MinMaxRangePair minMaxRangePair = aggregationResultHolder.getResult();
- if (minMaxRangePair == null) {
- return new MinMaxRangePair(Double.POSITIVE_INFINITY,
Double.NEGATIVE_INFINITY);
+ if (minMaxRangePair == null && !_nullHandlingEnabled) {
+ return new MinMaxRangePair();
} else {
return minMaxRangePair;
}
@@ -171,8 +171,8 @@ public class MinMaxRangeAggregationFunction extends
BaseSingleInputAggregationFu
@Override
public MinMaxRangePair extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
MinMaxRangePair minMaxRangePair = groupByResultHolder.getResult(groupKey);
- if (minMaxRangePair == null) {
- return new MinMaxRangePair(Double.POSITIVE_INFINITY,
Double.NEGATIVE_INFINITY);
+ if (minMaxRangePair == null && !_nullHandlingEnabled) {
+ return new MinMaxRangePair();
} else {
return minMaxRangePair;
}
@@ -196,6 +196,9 @@ public class MinMaxRangeAggregationFunction extends
BaseSingleInputAggregationFu
@Override
public Double extractFinalResult(MinMaxRangePair intermediateResult) {
+ if (intermediateResult == null) {
+ return null;
+ }
return intermediateResult.getMax() - intermediateResult.getMin();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
index 466a6b044f..534bdb41f2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
@@ -30,7 +30,7 @@ import org.apache.pinot.segment.spi.AggregationFunctionType;
public class MinMaxRangeMVAggregationFunction extends
MinMaxRangeAggregationFunction {
public MinMaxRangeMVAggregationFunction(List<ExpressionContext> arguments) {
- super(verifySingleArgument(arguments, "MIN_MAX_RANGE_MV"));
+ super(verifySingleArgument(arguments, "MIN_MAX_RANGE_MV"), false);
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java
new file mode 100644
index 0000000000..822399d66f
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class MinMaxRangeAggregationFunctionTest extends
AbstractAggregationFunctionTest {
+
+ @DataProvider(name = "scenarios")
+ Object[] scenarios() {
+ return new Object[] {
+ new Scenario(FieldSpec.DataType.INT),
+ new Scenario(FieldSpec.DataType.LONG),
+ new Scenario(FieldSpec.DataType.FLOAT),
+ new Scenario(FieldSpec.DataType.DOUBLE),
+ };
+ }
+
+ public class Scenario {
+ private final FieldSpec.DataType _dataType;
+
+ public Scenario(FieldSpec.DataType dataType) {
+ _dataType = dataType;
+ }
+
+ public FluentQueryTest.DeclaringTable getDeclaringTable(boolean
nullHandlingEnabled) {
+ return givenSingleNullableFieldTable(_dataType, nullHandlingEnabled);
+ }
+
+ @Override
+ public String toString() {
+ return "Scenario{" + "dt=" + _dataType + '}';
+ }
+ }
+
+ String diffBetweenMinAnd9(FieldSpec.DataType dt) {
+ switch (dt) {
+ case INT: return "2.147483657E9";
+ case LONG: return "9.223372036854776E18";
+ case FLOAT: return "Infinity";
+ case DOUBLE: return "Infinity";
+ default: throw new IllegalArgumentException(dt.toString());
+ }
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrWithoutNull(Scenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "null",
+ "1",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null",
+ "9",
+ "null"
+ )
+ .whenQuery("select minmaxrange(myField) from testTable")
+ .thenResultIs("DOUBLE", diffBetweenMinAnd9(scenario._dataType));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrWithNull(Scenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "null",
+ "1",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null",
+ "9",
+ "null"
+ ).whenQuery("select minmaxrange(myField) from testTable")
+ .thenResultIs("DOUBLE", "8");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrSvWithoutNull(Scenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "null",
+ "1",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null",
+ "9",
+ "null"
+ ).whenQuery("select 'cte', minmaxrange(myField) from testTable group
by 'cte'")
+ .thenResultIs("STRING | DOUBLE", "cte | " +
diffBetweenMinAnd9(scenario._dataType));
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrSvWithNull(Scenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "null",
+ "1",
+ "null"
+ ).andOnSecondInstance("myField",
+ "null",
+ "9",
+ "null"
+ ).whenQuery("select 'cte', minmaxrange(myField) from testTable group
by 'cte'")
+ .thenResultIs("STRING | DOUBLE", "cte | 8");
+ }
+
+ String aggrSvSelfWithoutNullResult(FieldSpec.DataType dt) {
+ switch (dt) {
+ case INT: return "0";
+ case LONG: return "0";
+ case FLOAT: return "NaN";
+ case DOUBLE: return "NaN";
+ default: throw new IllegalArgumentException(dt.toString());
+ }
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrSvSelfWithoutNull(Scenario scenario) {
+ PinotDataType pinotDataType = scenario._dataType == FieldSpec.DataType.INT
+ ? PinotDataType.INTEGER :
PinotDataType.valueOf(scenario._dataType.name());
+
+ Object defaultNullValue;
+ switch (scenario._dataType) {
+ case INT:
+ defaultNullValue = Integer.MIN_VALUE;
+ break;
+ case LONG:
+ defaultNullValue = Long.MIN_VALUE;
+ break;
+ case FLOAT:
+ defaultNullValue = Float.NEGATIVE_INFINITY;
+ break;
+ case DOUBLE:
+ defaultNullValue = Double.NEGATIVE_INFINITY;
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected scenario data type " +
scenario._dataType);
+ }
+
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField",
+ "null",
+ "1",
+ "2"
+ ).andOnSecondInstance("myField",
+ "null",
+ "1",
+ "2"
+ ).whenQuery("select myField, minmaxrange(myField) from testTable group
by myField order by myField")
+ .thenResultIs(pinotDataType + " | DOUBLE",
+ defaultNullValue + " | " +
aggrSvSelfWithoutNullResult(scenario._dataType),
+ "1 | 0",
+ "2 | 0");
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrSvSelfWithNull(Scenario scenario) {
+ PinotDataType pinotDataType = scenario._dataType == FieldSpec.DataType.INT
+ ? PinotDataType.INTEGER :
PinotDataType.valueOf(scenario._dataType.name());
+
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField",
+ "null",
+ "1",
+ "2"
+ ).andOnSecondInstance("myField",
+ "null",
+ "1",
+ "2"
+ ).whenQuery("select myField, minmaxrange(myField) from testTable group
by myField order by myField")
+ .thenResultIs(pinotDataType + " | DOUBLE", "1 | 0", "2 | 0", "null |
null");
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
index d7de9ef01f..484e22b4f9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
@@ -53,7 +53,7 @@ public class MinMaxRangeValueAggregator implements
ValueAggregator<Object, MinMa
value.apply(deserializeAggregatedValue((byte[]) rawValue));
} else {
double doubleValue = ((Number) rawValue).doubleValue();
- value.apply(doubleValue, doubleValue);
+ value.apply(doubleValue);
}
return value;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java
index 09e926378d..940e980821 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java
@@ -26,11 +26,19 @@ public class MinMaxRangePair implements
Comparable<MinMaxRangePair> {
private double _min;
private double _max;
+ public MinMaxRangePair() {
+ this(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+ }
+
public MinMaxRangePair(double min, double max) {
_min = min;
_max = max;
}
+ public void apply(double value) {
+ apply(value, value);
+ }
+
public void apply(double min, double max) {
if (min < _min) {
_min = min;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]