This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cb3ccdcaee First with time (#12235)
cb3ccdcaee is described below
commit cb3ccdcaeec4387adced4e7210435ec6260807c7
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Mar 20 00:35:42 2024 -0700
First with time (#12235)
* Support nullHandling in FirstWithTime
* Add last_with_time as well
---
.../function/AggregationFunctionFactory.java | 26 +--
...irstDoubleValueWithTimeAggregationFunction.java | 55 +++---
...FirstFloatValueWithTimeAggregationFunction.java | 55 +++---
.../FirstIntValueWithTimeAggregationFunction.java | 54 +++---
.../FirstLongValueWithTimeAggregationFunction.java | 55 +++---
...irstStringValueWithTimeAggregationFunction.java | 55 +++---
.../function/FirstWithTimeAggregationFunction.java | 94 +++++++----
...LastDoubleValueWithTimeAggregationFunction.java | 55 +++---
.../LastFloatValueWithTimeAggregationFunction.java | 55 +++---
.../LastIntValueWithTimeAggregationFunction.java | 54 +++---
.../LastLongValueWithTimeAggregationFunction.java | 55 +++---
...LastStringValueWithTimeAggregationFunction.java | 55 +++---
.../function/LastWithTimeAggregationFunction.java | 34 +++-
.../NullableSingleInputAggregationFunction.java | 113 +++++++++++++
.../FirstWithTimeAggregationFunctionTest.java | 186 +++++++++++++++++++++
.../LastWithTimeAggregationFunctionTest.java | 174 +++++++++++++++++++
.../segment/local/customobject/ValueLongPair.java | 8 +
17 files changed, 831 insertions(+), 352 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 3c449f1578..eeed8608a4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -221,17 +221,19 @@ public class AggregationFunctionFactory {
DataType dataType =
DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
switch (dataType) {
case BOOLEAN:
- return new
FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, true);
+ return new
FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled,
+ true);
case INT:
- return new
FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol, false);
+ return new
FirstIntValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled,
+ false);
case LONG:
- return new
FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol);
+ return new
FirstLongValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled);
case FLOAT:
- return new
FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
+ return new
FirstFloatValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled);
case DOUBLE:
- return new
FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
+ return new
FirstDoubleValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled);
case STRING:
- return new
FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol);
+ return new
FirstStringValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled);
default:
throw new IllegalArgumentException("Unsupported data type for
FIRST_WITH_TIME: " + dataType);
}
@@ -300,17 +302,17 @@ public class AggregationFunctionFactory {
DataType dataType =
DataType.valueOf(dataTypeExp.getLiteral().getStringValue().toUpperCase());
switch (dataType) {
case BOOLEAN:
- return new
LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, true);
+ return new
LastIntValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled, true);
case INT:
- return new
LastIntValueWithTimeAggregationFunction(firstArgument, timeCol, false);
+ return new
LastIntValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled, false);
case LONG:
- return new
LastLongValueWithTimeAggregationFunction(firstArgument, timeCol);
+ return new
LastLongValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled);
case FLOAT:
- return new
LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
+ return new
LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled);
case DOUBLE:
- return new
LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
+ return new
LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled);
case STRING:
- return new
LastStringValueWithTimeAggregationFunction(firstArgument, timeCol);
+ return new
LastStringValueWithTimeAggregationFunction(firstArgument, timeCol,
nullHandlingEnabled);
default:
throw new IllegalArgumentException("Unsupported data type for
LAST_WITH_TIME: " + dataType);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
index 270a86d671..46d61f225d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstDoubleValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.DoubleLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -41,8 +41,9 @@ import
org.apache.pinot.segment.local.customobject.ValueLongPair;
public class FirstDoubleValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<Double> {
private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR = new
DoubleLongPair(Double.NaN, Long.MAX_VALUE);
- public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext
dataCol, ExpressionContext timeCol) {
- super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+ public FirstDoubleValueWithTimeAggregationFunction(ExpressionContext
dataCol, ExpressionContext timeCol,
+ boolean nullHandlingEnabled) {
+ super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE,
nullHandlingEnabled);
}
@Override
@@ -56,22 +57,8 @@ public class FirstDoubleValueWithTimeAggregationFunction
extends FirstWithTimeAg
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
- Double firstData = defaultValueLongPair.getValue();
- long firstTime = defaultValueLongPair.getTime();
- double[] doubleValues = blockValSet.getDoubleValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- double data = doubleValues[i];
- long time = timeValues[i];
- if (time <= firstTime) {
- firstTime = time;
- firstData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ public Double readCell(BlockValSet block, int docId) {
+ return block.getDoubleValuesSV()[docId];
}
@Override
@@ -79,11 +66,15 @@ public class FirstDoubleValueWithTimeAggregationFunction
extends FirstWithTimeAg
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- double data = doubleValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ double data = doubleValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -91,13 +82,17 @@ public class FirstDoubleValueWithTimeAggregationFunction
extends FirstWithTimeAg
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- double value = doubleValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ double value = doubleValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
index e4c4da2400..d34fe755ec 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstFloatValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.FloatLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -41,8 +41,9 @@ import
org.apache.pinot.segment.local.customobject.ValueLongPair;
public class FirstFloatValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<Float> {
private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new
FloatLongPair(Float.NaN, Long.MAX_VALUE);
- public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol) {
- super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
+ public FirstFloatValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
+ boolean nullHandlingEnabled) {
+ super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE,
nullHandlingEnabled);
}
@Override
@@ -56,22 +57,8 @@ public class FirstFloatValueWithTimeAggregationFunction
extends FirstWithTimeAgg
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
- Float firstData = defaultValueLongPair.getValue();
- long firstTime = defaultValueLongPair.getTime();
- float[] floatValues = blockValSet.getFloatValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- float data = floatValues[i];
- long time = timeValues[i];
- if (time <= firstTime) {
- firstTime = time;
- firstData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ public Float readCell(BlockValSet block, int docId) {
+ return block.getFloatValuesSV()[docId];
}
@Override
@@ -79,11 +66,15 @@ public class FirstFloatValueWithTimeAggregationFunction
extends FirstWithTimeAgg
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- float data = floatValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ float data = floatValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -91,13 +82,17 @@ public class FirstFloatValueWithTimeAggregationFunction
extends FirstWithTimeAgg
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- float value = floatValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ float value = floatValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
index 8623f0bdc5..97b5829968 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstIntValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -46,8 +46,8 @@ public class FirstIntValueWithTimeAggregationFunction extends
FirstWithTimeAggre
private final boolean _isBoolean;
public FirstIntValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
- boolean isBoolean) {
- super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
+ boolean nullHandlingEnabled, boolean isBoolean) {
+ super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE,
nullHandlingEnabled);
_isBoolean = isBoolean;
}
@@ -62,22 +62,8 @@ public class FirstIntValueWithTimeAggregationFunction
extends FirstWithTimeAggre
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
- Integer firstData = defaultValueLongPair.getValue();
- long firstTime = defaultValueLongPair.getTime();
- int[] intValues = blockValSet.getIntValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- int data = intValues[i];
- long time = timeValues[i];
- if (time <= firstTime) {
- firstTime = time;
- firstData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ public Integer readCell(BlockValSet block, int docId) {
+ return block.getIntValuesSV()[docId];
}
@Override
@@ -85,11 +71,15 @@ public class FirstIntValueWithTimeAggregationFunction
extends FirstWithTimeAggre
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- int data = intValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ int data = intValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -97,13 +87,17 @@ public class FirstIntValueWithTimeAggregationFunction
extends FirstWithTimeAggre
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- int value = intValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ int value = intValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
index 0f5afeaa97..9b1fd34d2a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstLongValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -41,8 +41,9 @@ import
org.apache.pinot.segment.local.customobject.ValueLongPair;
public class FirstLongValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<Long> {
private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR = new
LongLongPair(Long.MIN_VALUE, Long.MAX_VALUE);
- public FirstLongValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol) {
- super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE);
+ public FirstLongValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
+ boolean nullHandlingEnabled) {
+ super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE,
nullHandlingEnabled);
}
@Override
@@ -56,22 +57,8 @@ public class FirstLongValueWithTimeAggregationFunction
extends FirstWithTimeAggr
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair();
- Long firstData = defaultValueLongPair.getValue();
- long firstTime = defaultValueLongPair.getTime();
- long[] longValues = blockValSet.getLongValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- long data = longValues[i];
- long time = timeValues[i];
- if (time <= firstTime) {
- firstTime = time;
- firstData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ public Long readCell(BlockValSet block, int docId) {
+ return block.getLongValuesSV()[docId];
}
@Override
@@ -79,11 +66,15 @@ public class FirstLongValueWithTimeAggregationFunction
extends FirstWithTimeAggr
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
long[] longValues = blockValSet.getLongValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- long data = longValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long data = longValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -91,13 +82,17 @@ public class FirstLongValueWithTimeAggregationFunction
extends FirstWithTimeAggr
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
long[] longValues = blockValSet.getLongValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- long value = longValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long value = longValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
index c30f4bfacb..4fe34117d2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstStringValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -41,8 +41,9 @@ import
org.apache.pinot.segment.local.customobject.ValueLongPair;
public class FirstStringValueWithTimeAggregationFunction extends
FirstWithTimeAggregationFunction<String> {
private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new
StringLongPair("", Long.MAX_VALUE);
- public FirstStringValueWithTimeAggregationFunction(ExpressionContext
dataCol, ExpressionContext timeCol) {
- super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE);
+ public FirstStringValueWithTimeAggregationFunction(ExpressionContext
dataCol, ExpressionContext timeCol,
+ boolean nullHandlingEnabled) {
+ super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE,
nullHandlingEnabled);
}
@Override
@@ -56,22 +57,8 @@ public class FirstStringValueWithTimeAggregationFunction
extends FirstWithTimeAg
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair();
- String firstData = defaultValueLongPair.getValue();
- long firstTime = defaultValueLongPair.getTime();
- String[] stringValues = blockValSet.getStringValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- String data = stringValues[i];
- long time = timeValues[i];
- if (time <= firstTime) {
- firstTime = time;
- firstData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ public String readCell(BlockValSet block, int docId) {
+ return block.getStringValuesSV()[docId];
}
@Override
@@ -79,11 +66,15 @@ public class FirstStringValueWithTimeAggregationFunction
extends FirstWithTimeAg
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
String[] stringValues = blockValSet.getStringValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- String data = stringValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ String data = stringValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -91,13 +82,17 @@ public class FirstStringValueWithTimeAggregationFunction
extends FirstWithTimeAg
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
String[] stringValues = blockValSet.getStringValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- String value = stringValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ String value = stringValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
index 7049d0f1db..3d04803fde 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunction.java
@@ -29,9 +29,11 @@ import
org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.IntIterator;
/**
@@ -47,14 +49,13 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class FirstWithTimeAggregationFunction<V extends Comparable<V>>
- extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+ extends NullableSingleInputAggregationFunction<ValueLongPair<V>, V> {
protected final ExpressionContext _timeCol;
private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>>
_objectSerDe;
- public FirstWithTimeAggregationFunction(ExpressionContext dataCol,
- ExpressionContext timeCol,
- ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
- super(dataCol);
+ public FirstWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
+ ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe,
boolean nullHandlingEnabled) {
+ super(dataCol, nullHandlingEnabled);
_timeCol = timeCol;
_objectSerDe = objectSerDe;
}
@@ -63,8 +64,7 @@ public abstract class FirstWithTimeAggregationFunction<V
extends Comparable<V>>
public abstract ValueLongPair<V> getDefaultValueTimePair();
- public abstract void aggregateResultWithRawData(int length,
AggregationResultHolder aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet);
+ public abstract V readCell(BlockValSet block, int docId);
public abstract void aggregateGroupResultWithRawDataSv(int length,
int[] groupKeyArray,
@@ -100,23 +100,45 @@ public abstract class FirstWithTimeAggregationFunction<V
extends Comparable<V>>
BlockValSet blockValSet = blockValSetMap.get(_expression);
BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
if (blockValSet.getValueType() != DataType.BYTES) {
- aggregateResultWithRawData(length, aggregationResultHolder, blockValSet,
blockTimeSet);
+ IntLongPair defaultPair = new IntLongPair(Integer.MIN_VALUE,
Long.MAX_VALUE);
+ long[] timeValues = blockTimeSet.getLongValuesSV();
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, blockTimeSet);
+ IntLongPair bestPair = foldNotNull(length, nullIdxIterator, defaultPair,
(pair, from, to) -> {
+ IntLongPair actualPair = pair;
+ for (int i = from; i < to; i++) {
+ long time = timeValues[i];
+ if (time <= actualPair.getTime()) {
+ actualPair = new IntLongPair(i, time);
+ }
+ }
+ return actualPair;
+ });
+ V bestValue;
+ if (bestPair.getValue() < 0) {
+ bestValue = getDefaultValueTimePair().getValue();
+ } else {
+ bestValue = readCell(blockValSet, bestPair.getValue());
+ }
+ setAggregationResult(aggregationResultHolder, bestValue,
bestPair.getTime());
} else {
+ // We assume bytes contain the binary serialization of FirstPair
ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
- V firstData = defaultValueLongPair.getValue();
- long firstTime = defaultValueLongPair.getTime();
- // Serialized FirstPair
+
+ ValueLongPair<V> result =
constructValueLongPair(defaultValueLongPair.getValue(),
defaultValueLongPair.getTime());
byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
- V data = firstWithTimePair.getValue();
- long time = firstWithTimePair.getTime();
- if (time <= firstTime) {
- firstTime = time;
- firstData = data;
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ long time = firstWithTimePair.getTime();
+ if (time < result.getTime()) {
+ result.setTime(time);
+ result.setValue(firstWithTimePair.getValue());
+ }
}
- }
- setAggregationResult(aggregationResultHolder, firstData, firstTime);
+ });
+
+ setAggregationResult(aggregationResultHolder, result.getValue(),
result.getTime());
}
}
@@ -138,13 +160,15 @@ public abstract class FirstWithTimeAggregationFunction<V
extends Comparable<V>>
} else {
// Serialized FirstPair
byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
- setGroupByResult(groupKeyArray[i],
- groupByResultHolder,
- firstWithTimePair.getValue(),
- firstWithTimePair.getTime());
- }
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ setGroupByResult(groupKeyArray[i],
+ groupByResultHolder,
+ firstWithTimePair.getValue(),
+ firstWithTimePair.getTime());
+ }
+ });
}
}
@@ -158,14 +182,16 @@ public abstract class FirstWithTimeAggregationFunction<V
extends Comparable<V>>
} else {
// Serialized ValueTimePair
byte[][] bytesValues = blockValSet.getBytesValuesSV();
- for (int i = 0; i < length; i++) {
- ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
- V data = firstWithTimePair.getValue();
- long time = firstWithTimePair.getTime();
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, data, time);
+ forEachNotNull(length, blockValSet, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ ValueLongPair<V> firstWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ V data = firstWithTimePair.getValue();
+ long time = firstWithTimePair.getTime();
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, data, time);
+ }
}
- }
+ });
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
index 93aec2b0c1..398455c799 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.DoubleLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -41,8 +41,9 @@ import
org.apache.pinot.segment.local.customobject.ValueLongPair;
public class LastDoubleValueWithTimeAggregationFunction extends
LastWithTimeAggregationFunction<Double> {
private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR = new
DoubleLongPair(Double.NaN, Long.MIN_VALUE);
- public LastDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol) {
- super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+ public LastDoubleValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
+ boolean nullHandlingEnabled) {
+ super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE,
nullHandlingEnabled);
}
@Override
@@ -56,22 +57,8 @@ public class LastDoubleValueWithTimeAggregationFunction
extends LastWithTimeAggr
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
- Double lastData = defaultValueLongPair.getValue();
- long lastTime = defaultValueLongPair.getTime();
- double[] doubleValues = blockValSet.getDoubleValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- double data = doubleValues[i];
- long time = timeValues[i];
- if (time >= lastTime) {
- lastTime = time;
- lastData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ public Double readCell(BlockValSet block, int docId) {
+ return block.getDoubleValuesSV()[docId];
}
@Override
@@ -79,11 +66,15 @@ public class LastDoubleValueWithTimeAggregationFunction
extends LastWithTimeAggr
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- double data = doubleValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ double data = doubleValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -91,13 +82,17 @@ public class LastDoubleValueWithTimeAggregationFunction
extends LastWithTimeAggr
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
double[] doubleValues = blockValSet.getDoubleValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- double value = doubleValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ double value = doubleValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
index 50635874c9..6d7eaaa172 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.FloatLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -41,8 +41,9 @@ import
org.apache.pinot.segment.local.customobject.ValueLongPair;
public class LastFloatValueWithTimeAggregationFunction extends
LastWithTimeAggregationFunction<Float> {
private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new
FloatLongPair(Float.NaN, Long.MIN_VALUE);
- public LastFloatValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol) {
- super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
+ public LastFloatValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
+ boolean nullHandlingEnabled) {
+ super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE,
nullHandlingEnabled);
}
@Override
@@ -56,22 +57,8 @@ public class LastFloatValueWithTimeAggregationFunction
extends LastWithTimeAggre
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
- Float lastData = defaultValueLongPair.getValue();
- long lastTime = defaultValueLongPair.getTime();
- float[] floatValues = blockValSet.getFloatValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- float data = floatValues[i];
- long time = timeValues[i];
- if (time >= lastTime) {
- lastTime = time;
- lastData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ public Float readCell(BlockValSet block, int docId) {
+ return block.getFloatValuesSV()[docId];
}
@Override
@@ -79,11 +66,15 @@ public class LastFloatValueWithTimeAggregationFunction
extends LastWithTimeAggre
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- float data = floatValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ float data = floatValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -91,13 +82,17 @@ public class LastFloatValueWithTimeAggregationFunction
extends LastWithTimeAggre
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
float[] floatValues = blockValSet.getFloatValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- float value = floatValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ float value = floatValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
index 87d33ce0df..e124f58d0e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -46,8 +46,8 @@ public class LastIntValueWithTimeAggregationFunction extends
LastWithTimeAggrega
private final boolean _isBoolean;
public LastIntValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
- boolean isBoolean) {
- super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
+ boolean nullHandlingEnabled, boolean isBoolean) {
+ super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE,
nullHandlingEnabled);
_isBoolean = isBoolean;
}
@@ -62,22 +62,8 @@ public class LastIntValueWithTimeAggregationFunction extends
LastWithTimeAggrega
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
- Integer lastData = defaultValueLongPair.getValue();
- long lastTime = defaultValueLongPair.getTime();
- int[] intValues = blockValSet.getIntValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- int data = intValues[i];
- long time = timeValues[i];
- if (time >= lastTime) {
- lastTime = time;
- lastData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ public Integer readCell(BlockValSet block, int docId) {
+ return block.getIntValuesSV()[docId];
}
@Override
@@ -85,11 +71,15 @@ public class LastIntValueWithTimeAggregationFunction
extends LastWithTimeAggrega
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- int data = intValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ int data = intValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -97,13 +87,17 @@ public class LastIntValueWithTimeAggregationFunction
extends LastWithTimeAggrega
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
int[] intValues = blockValSet.getIntValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- int value = intValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ int value = intValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
index 248487d635..6c0e08e761 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -41,8 +41,9 @@ import
org.apache.pinot.segment.local.customobject.ValueLongPair;
public class LastLongValueWithTimeAggregationFunction extends
LastWithTimeAggregationFunction<Long> {
private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR = new
LongLongPair(Long.MIN_VALUE, Long.MIN_VALUE);
- public LastLongValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol) {
- super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE);
+ public LastLongValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
+ boolean nullHandlingEnabled) {
+ super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE,
nullHandlingEnabled);
}
@Override
@@ -56,22 +57,8 @@ public class LastLongValueWithTimeAggregationFunction
extends LastWithTimeAggreg
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair();
- Long lastData = defaultValueLongPair.getValue();
- long lastTime = defaultValueLongPair.getTime();
- long[] longValues = blockValSet.getLongValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- long data = longValues[i];
- long time = timeValues[i];
- if (time >= lastTime) {
- lastTime = time;
- lastData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ public Long readCell(BlockValSet block, int docId) {
+ return block.getLongValuesSV()[docId];
}
@Override
@@ -79,11 +66,15 @@ public class LastLongValueWithTimeAggregationFunction
extends LastWithTimeAggreg
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
long[] longValues = blockValSet.getLongValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- long data = longValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long data = longValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -91,13 +82,17 @@ public class LastLongValueWithTimeAggregationFunction
extends LastWithTimeAggreg
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
long[] longValues = blockValSet.getLongValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- long value = longValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ long value = longValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
index 0f10e9399d..2b200b157a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
@@ -22,10 +22,10 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.roaringbitmap.IntIterator;
/**
@@ -41,8 +41,9 @@ import
org.apache.pinot.segment.local.customobject.ValueLongPair;
public class LastStringValueWithTimeAggregationFunction extends
LastWithTimeAggregationFunction<String> {
private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new
StringLongPair("", Long.MIN_VALUE);
- public LastStringValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol) {
- super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE);
+ public LastStringValueWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
+ boolean nullHandlingEnabled) {
+ super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE,
nullHandlingEnabled);
}
@Override
@@ -56,22 +57,8 @@ public class LastStringValueWithTimeAggregationFunction
extends LastWithTimeAggr
}
@Override
- public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet) {
- ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair();
- String lastData = defaultValueLongPair.getValue();
- long lastTime = defaultValueLongPair.getTime();
- String[] stringValues = blockValSet.getStringValuesSV();
- long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- String data = stringValues[i];
- long time = timeValues[i];
- if (time >= lastTime) {
- lastTime = time;
- lastData = data;
- }
- }
- setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ public String readCell(BlockValSet block, int docId) {
+ return block.getStringValuesSV()[docId];
}
@Override
@@ -79,11 +66,15 @@ public class LastStringValueWithTimeAggregationFunction
extends LastWithTimeAggr
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
String[] stringValues = blockValSet.getStringValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- String data = stringValues[i];
- long time = timeValues[i];
- setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
- }
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ String data = stringValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ });
}
@Override
@@ -91,13 +82,17 @@ public class LastStringValueWithTimeAggregationFunction
extends LastWithTimeAggr
GroupByResultHolder groupByResultHolder, BlockValSet blockValSet,
BlockValSet timeValSet) {
String[] stringValues = blockValSet.getStringValuesSV();
long[] timeValues = timeValSet.getLongValuesSV();
- for (int i = 0; i < length; i++) {
- String value = stringValues[i];
- long time = timeValues[i];
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value, time);
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, timeValSet);
+ forEachNotNull(length, nullIdxIterator, (from, to) -> {
+ for (int i = from; i < to; i++) {
+ String value = stringValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
}
- }
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
index cd4b0576a6..565445ae6f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
@@ -29,9 +29,11 @@ import
org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.IntIterator;
/**
@@ -47,14 +49,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
- extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+ extends NullableSingleInputAggregationFunction<ValueLongPair<V>, V> {
protected final ExpressionContext _timeCol;
private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>>
_objectSerDe;
public LastWithTimeAggregationFunction(ExpressionContext dataCol,
ExpressionContext timeCol,
- ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
- super(dataCol);
+ ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe,
boolean nullHandlingEnabled) {
+ super(dataCol, nullHandlingEnabled);
_timeCol = timeCol;
_objectSerDe = objectSerDe;
}
@@ -63,8 +65,7 @@ public abstract class LastWithTimeAggregationFunction<V
extends Comparable<V>>
public abstract ValueLongPair<V> getDefaultValueTimePair();
- public abstract void aggregateResultWithRawData(int length,
AggregationResultHolder aggregationResultHolder,
- BlockValSet blockValSet, BlockValSet timeValSet);
+ public abstract V readCell(BlockValSet block, int docId);
public abstract void aggregateGroupResultWithRawDataSv(int length,
int[] groupKeyArray,
@@ -100,8 +101,29 @@ public abstract class LastWithTimeAggregationFunction<V
extends Comparable<V>>
BlockValSet blockValSet = blockValSetMap.get(_expression);
BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
if (blockValSet.getValueType() != DataType.BYTES) {
- aggregateResultWithRawData(length, aggregationResultHolder, blockValSet,
blockTimeSet);
+ IntLongPair defaultPair = new IntLongPair(Integer.MIN_VALUE,
Long.MIN_VALUE);
+ long[] timeValues = blockTimeSet.getLongValuesSV();
+
+ IntIterator nullIdxIterator = orNullIterator(blockValSet, blockTimeSet);
+ IntLongPair bestPair = foldNotNull(length, nullIdxIterator, defaultPair,
(pair, from, to) -> {
+ IntLongPair actualPair = pair;
+ for (int i = from; i < to; i++) {
+ long time = timeValues[i];
+ if (time >= actualPair.getTime()) {
+ actualPair = new IntLongPair(i, time);
+ }
+ }
+ return actualPair;
+ });
+ V bestValue;
+ if (bestPair.getValue() < 0) {
+ bestValue = getDefaultValueTimePair().getValue();
+ } else {
+ bestValue = readCell(blockValSet, bestPair.getValue());
+ }
+ setAggregationResult(aggregationResultHolder, bestValue,
bestPair.getTime());
} else {
+ // We assume bytes contain the binary serialization of FirstPair
ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
V lastData = defaultValueLongPair.getValue();
long lastTime = defaultValueLongPair.getTime();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java
index 0a42db7442..78f1ae1269 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/NullableSingleInputAggregationFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
@@ -138,4 +139,116 @@ public abstract class
NullableSingleInputAggregationFunction<I, F extends Compar
}
return acum;
}
+
+ public IntIterator orNullIterator(BlockValSet valSet1, BlockValSet valSet2) {
+ if (!_nullHandlingEnabled) {
+ return EmptyIntIterator.INSTANCE;
+ } else {
+ RoaringBitmap nullBlock1 = valSet1.getNullBitmap();
+ RoaringBitmap nullBlock2 = valSet2.getNullBitmap();
+ if (nullBlock1 == null) {
+ return nullBlock2 == null ? EmptyIntIterator.INSTANCE :
nullBlock2.getIntIterator();
+ } else if (nullBlock2 == null) {
+ return nullBlock1.getIntIterator();
+ } else {
+ return new MinIntIterator(nullBlock1.getIntIterator(),
nullBlock2.getIntIterator());
+ }
+ }
+ }
+
+ public static class EmptyIntIterator implements IntIterator {
+
+ public static final EmptyIntIterator INSTANCE = new EmptyIntIterator();
+
+ private EmptyIntIterator() {
+ }
+
+ @Override
+ public IntIterator clone() {
+ return this;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public int next() {
+ throw new NoSuchElementException();
+ }
+ }
+
+ public static class MinIntIterator implements IntIterator {
+ private final IntIterator _it1;
+ private final IntIterator _it2;
+ private int _next1 = -1;
+ private int _next2 = -1;
+
+ /**
+ * @param it1 it has to iterate in ascending order and the min value is 0
+ * @param it2 it has to iterate in ascending order and the min value is 0
+ */
+ public MinIntIterator(IntIterator it1, IntIterator it2) {
+ _it1 = it1;
+ _it2 = it2;
+ }
+
+ @Override
+ public IntIterator clone() {
+ return new MinIntIterator(_it1.clone(), _it2.clone());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _next1 > 0 || _next2 > 0 || _it1.hasNext() || _it2.hasNext();
+ }
+
+ @Override
+ public int next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (_next1 < 0) {
+ if (_it1.hasNext()) {
+ _next1 = _it1.next();
+ } else { //it1 is completely consumed
+ if (_next2 >= 0) { // consume the last cached value
+ return consume2();
+ } else { // after that, return all values from it2
+ return _it2.next();
+ }
+ }
+ }
+ if (_next2 < 0) {
+ if (_it2.hasNext()) {
+ _next2 = _it2.next();
+ } else { //it2 is completely consumed
+ if (_next1 >= 0) { // consume the last cached value
+ return consume1();
+ } else { // after that, return all values from it1
+ return _it1.next();
+ }
+ }
+ }
+ assert _next1 >= 0 && _next2 >= 0;
+ if (_next1 <= _next2) {
+ return consume1();
+ } else {
+ return consume2();
+ }
+ }
+
+ private int consume1() {
+ int nextVal = _next1;
+ _next1 = -1;
+ return nextVal;
+ }
+
+ private int consume2() {
+ int nextVal = _next2;
+ _next2 = -1;
+ return nextVal;
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java
new file mode 100644
index 0000000000..541b007d47
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/FirstWithTimeAggregationFunctionTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class FirstWithTimeAggregationFunctionTest extends
AbstractAggregationFunctionTest {
+
+ @DataProvider(name = "scenarios")
+ Object[] scenarios() {
+ return new Object[] {
+ new Scenario(FieldSpec.DataType.INT, "1", "2", "-2147483648"),
+ new Scenario(FieldSpec.DataType.LONG, "1", "2",
"-9223372036854775808"),
+ new Scenario(FieldSpec.DataType.FLOAT, "1", "2", "-Infinity"),
+ new Scenario(FieldSpec.DataType.DOUBLE, "1", "2", "-Infinity"),
+ new Scenario(FieldSpec.DataType.STRING, "a", "b", "\"null\""),
+ };
+ }
+
+ public class Scenario {
+ private final PinotDataType _pinotDataType;
+ private final FieldSpec.DataType _dataType;
+ private final String _valAsStr1;
+ private final String _valAsStr2;
+ private final String _defaultNullValue;
+
+ public Scenario(FieldSpec.DataType dataType, String valAsStr1, String
valAsStr2, String defaultNullValue) {
+ _dataType = dataType;
+ _valAsStr1 = valAsStr1;
+ _valAsStr2 = valAsStr2;
+ _defaultNullValue = defaultNullValue;
+ _pinotDataType =
+ _dataType == FieldSpec.DataType.INT ? PinotDataType.INTEGER :
PinotDataType.valueOf(_dataType.name());
+ }
+
+ public FluentQueryTest.DeclaringTable getDeclaringTable(boolean
nullHandlingEnabled) {
+ Schema schema = new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addDimensionField("myField", _dataType, f -> f.setNullable(true))
+ .addDimensionField("timeField", FieldSpec.DataType.TIMESTAMP)
+ .build();
+ TableConfigBuilder tableConfigBuilder = new
TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("testTable");
+
+ return FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(nullHandlingEnabled)
+ .givenTable(schema, tableConfigBuilder.build());
+ }
+
+ @Override
+ public String toString() {
+ return "Scenario{" + "dt=" + _dataType + ", val1='" + _valAsStr1 + '\''
+ ", val2='"
+ + _valAsStr2 + '\'' + '}';
+ }
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrWithoutNull(Scenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField | timeField",
+ "null | 1",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ "null | 4",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ )
+ .whenQuery("select FIRST_WITH_TIME(myField, timeField, '" +
scenario._dataType + "') from testTable")
+ .thenResultIs(scenario._pinotDataType.name(),
scenario._defaultNullValue);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrWithNull(Scenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField | timeField",
+ "null | 1",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ "null | 4",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ )
+ .whenQuery("select FIRST_WITH_TIME(myField, timeField, '" +
scenario._dataType + "') from testTable")
+ .thenResultIs(scenario._pinotDataType.name(), scenario._valAsStr1);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrSvWithoutNull(Scenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField | timeField",
+ "null | 1",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ "null | 4",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ ).whenQuery("select 'cte', FIRST_WITH_TIME(myField, timeField, '" +
scenario._dataType + "') as mode "
+ + "from testTable "
+ + "group by 'cte'")
+ .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " +
scenario._defaultNullValue);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrSvWithNull(Scenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField | timeField",
+ "null | 1",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ "null | 4",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ ).whenQuery("select 'cte', FIRST_WITH_TIME(myField, timeField, '" +
scenario._dataType + "') as mode "
+ + "from testTable "
+ + "group by 'cte'")
+ .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " +
scenario._valAsStr1);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrMvWithoutNull(Scenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField | timeField",
+ "null | 1",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ "null | 4",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, "
+ + "FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType +
"') as mode "
+ + "from testTable "
+ + "group by 'cte'")
+ .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(),
+ "cte1 | cte2 | " + scenario._defaultNullValue);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrMvWithNull(Scenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField | timeField",
+ "null | 1",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ "null | 4",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, "
+ + "FIRST_WITH_TIME(myField, timeField, '" + scenario._dataType +
"') as mode "
+ + "from testTable "
+ + "group by 'cte'")
+ .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(),
+ "cte1 | cte2 | " + scenario._valAsStr1);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java
new file mode 100644
index 0000000000..9c612ae538
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunctionTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class LastWithTimeAggregationFunctionTest extends
AbstractAggregationFunctionTest {
+
+ @DataProvider(name = "scenarios")
+ Object[] scenarios() {
+ return new Object[] {
+ new Scenario(FieldSpec.DataType.INT, "1", "2", "-2147483648"),
+ new Scenario(FieldSpec.DataType.LONG, "1", "2",
"-9223372036854775808"),
+ new Scenario(FieldSpec.DataType.FLOAT, "1", "2", "-Infinity"),
+ new Scenario(FieldSpec.DataType.DOUBLE, "1", "2", "-Infinity"),
+ new Scenario(FieldSpec.DataType.STRING, "a", "b", "\"null\""),
+ };
+ }
+
+ public class Scenario {
+ private final PinotDataType _pinotDataType;
+ private final FieldSpec.DataType _dataType;
+ private final String _valAsStr1;
+ private final String _valAsStr2;
+ private final String _defaultNullValue;
+
+ public Scenario(FieldSpec.DataType dataType, String valAsStr1, String
valAsStr2, String defaultNullValue) {
+ _dataType = dataType;
+ _valAsStr1 = valAsStr1;
+ _valAsStr2 = valAsStr2;
+ _defaultNullValue = defaultNullValue;
+ _pinotDataType =
+ _dataType == FieldSpec.DataType.INT ? PinotDataType.INTEGER :
PinotDataType.valueOf(_dataType.name());
+ }
+
+ public FluentQueryTest.DeclaringTable getDeclaringTable(boolean
nullHandlingEnabled) {
+ Schema schema = new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addDimensionField("myField", _dataType, f -> f.setNullable(true))
+ .addDimensionField("timeField", FieldSpec.DataType.TIMESTAMP)
+ .build();
+ TableConfigBuilder tableConfigBuilder = new
TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("testTable");
+
+ return FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(nullHandlingEnabled)
+ .givenTable(schema, tableConfigBuilder.build());
+ }
+
+ @Override
+ public String toString() {
+ return "Scenario{" + "dt=" + _dataType + ", val1='" + _valAsStr1 + '\''
+ ", val2='"
+ + _valAsStr2 + '\'' + '}';
+ }
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrWithoutNull(Scenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField | timeField",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ )
+ .whenQuery("select LAST_WITH_TIME(myField, timeField, '" +
scenario._dataType + "') from testTable")
+ .thenResultIs(scenario._pinotDataType.name(),
scenario._defaultNullValue);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrWithNull(Scenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField | timeField",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ )
+ .whenQuery("select LAST_WITH_TIME(myField, timeField, '" +
scenario._dataType + "') from testTable")
+ .thenResultIs(scenario._pinotDataType.name(), scenario._valAsStr2);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrSvWithoutNull(Scenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField | timeField",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ ).whenQuery("select 'cte', LAST_WITH_TIME(myField, timeField, '" +
scenario._dataType + "') as mode "
+ + "from testTable "
+ + "group by 'cte'")
+ .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " +
scenario._defaultNullValue);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrSvWithNull(Scenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField | timeField",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ ).whenQuery("select 'cte', LAST_WITH_TIME(myField, timeField, '" +
scenario._dataType + "') as mode "
+ + "from testTable "
+ + "group by 'cte'")
+ .thenResultIs("STRING | " + scenario._pinotDataType.name(), "cte | " +
scenario._valAsStr2);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrMvWithoutNull(Scenario scenario) {
+ scenario.getDeclaringTable(false)
+ .onFirstInstance("myField | timeField",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, "
+ + "LAST_WITH_TIME(myField, timeField, '" + scenario._dataType +
"') as mode "
+ + "from testTable "
+ + "group by 'cte'")
+ .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(),
+ "cte1 | cte2 | " + scenario._defaultNullValue);
+ }
+
+ @Test(dataProvider = "scenarios")
+ void aggrMvWithNull(Scenario scenario) {
+ scenario.getDeclaringTable(true)
+ .onFirstInstance("myField | timeField",
+ scenario._valAsStr1 + " | 2",
+ "null | 3"
+ ).andOnSecondInstance("myField | timeField",
+ scenario._valAsStr2 + " | 5",
+ "null | 6"
+ ).whenQuery("select 'cte1' as cte1, 'cte2' as cte2, "
+ + "LAST_WITH_TIME(myField, timeField, '" + scenario._dataType +
"') as mode "
+ + "from testTable "
+ + "group by 'cte'")
+ .thenResultIs("STRING | STRING | " + scenario._pinotDataType.name(),
+ "cte1 | cte2 | " + scenario._valAsStr2);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
index 81e2ded523..d6c3cf82fb 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
@@ -35,6 +35,14 @@ public abstract class ValueLongPair<V extends Comparable<V>>
implements Comparab
return _time;
}
+ public void setValue(V value) {
+ _value = value;
+ }
+
+ public void setTime(long time) {
+ _time = time;
+ }
+
abstract public byte[] toBytes();
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]