This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 159fc6716e [timeseries] Add Support for Passing Raw Time Values to
Leaf Stage (#15000)
159fc6716e is described below
commit 159fc6716efa1cd50dfa9d7f59ab4c7092e38935
Author: Ankit Sultana <[email protected]>
AuthorDate: Thu Feb 6 23:22:31 2025 -0600
[timeseries] Add Support for Passing Raw Time Values to Leaf Stage (#15000)
---
.../function/TimeSeriesAggregationFunction.java | 99 ++++++++++++++++------
.../core/query/executor/QueryExecutorTest.java | 6 +-
.../PhysicalTimeSeriesServerPlanVisitor.java | 7 +-
.../tsdb/spi/series/BaseTimeSeriesBuilder.java | 17 ++++
4 files changed, 94 insertions(+), 35 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java
index 79feb866cb..239fa4a556 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/TimeSeriesAggregationFunction.java
@@ -24,7 +24,9 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
@@ -44,8 +46,33 @@ import
org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
/**
- * Aggregation function used by the Time Series Engine.
- * TODO: This can't be used with SQL because the Object Serde is not
implemented.
+ * Aggregation function used by the Time Series Engine. This can't be used
with SQL because the Object Serde is not yet
+ * implemented. Though we don't plan on exposing this function anytime soon.
+ * <h2>Converting Time Values to Bucket Indexes</h2>
+ * <p>
+ * This aggregation function will map each scanned data point to a time
bucket index. This is done using the
+ * formula: {@code ((timeValue + timeOffset) - timeReferencePoint - 1) /
bucketSize}. The entire calculation is done
+ * in the Time Unit (seconds, ms, etc.) of the timeValue returned by the
time expression chosen by the user.
+ * The method used to add values to the series builders is:
+ * {@link BaseTimeSeriesBuilder#addValueAtIndex(int, Double, long)}.
+ * </p>
+ * <p>
+ * The formula originates from the fact that we use half-open time
intervals, which are open on the left.
+ * The timeReferencePoint is usually the start of the time-range being
scanned. Assuming everything is in seconds,
+ * the time buckets can generally thought to look something like the
following:
+ * <pre>
+ * (timeReferencePoint, timeReferencePoint + bucketSize]
+ * (timeReferencePoint + bucketSize, timeReferencePoint + 2 * bucketSize]
+ * ...
+ * (timeReferencePoint + (numBuckets - 1) * bucketSize, timeReferencePoint
+ numBuckets * bucketSize]
+ * </pre>
+ * </p>
+ * <p>
+ * Also, note that the timeReferencePoint is simply calculated as follows:
+ * <pre>
+ * timeReferencePointInSeconds = firstBucketValue - bucketSizeInSeconds
+ * </pre>
+ * </p>
*/
public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTimeSeriesBuilder, DoubleArrayList> {
private final TimeSeriesBuilderFactory _factory;
@@ -53,32 +80,40 @@ public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTi
private final ExpressionContext _valueExpression;
private final ExpressionContext _timeExpression;
private final TimeBuckets _timeBuckets;
+ private final long _timeReferencePoint;
+ private final long _timeOffset;
+ private final long _timeBucketDivisor;
/**
* Arguments are as shown below:
* <pre>
- * timeSeriesAggregate("m3ql", "MIN", valueExpr, timeBucketExpr,
firstBucketValue, bucketLenSeconds, numBuckets,
- * "aggParam1=value1")
+ * timeSeriesAggregate("m3ql", "MIN", valueExpr, timeExpr, timeUnit,
offsetSeconds, firstBucketValue,
+ * bucketLenSeconds, numBuckets, "aggParam1=value1")
* </pre>
*/
public TimeSeriesAggregationFunction(List<ExpressionContext> arguments) {
- // Initialize everything
- Preconditions.checkArgument(arguments.size() == 8, "Expected 8 arguments
for time-series agg");
+ // Initialize temporary variables.
+ Preconditions.checkArgument(arguments.size() == 10, "Expected 10 arguments
for time-series agg");
String language = arguments.get(0).getLiteral().getStringValue();
String aggFunctionName = arguments.get(1).getLiteral().getStringValue();
ExpressionContext valueExpression = arguments.get(2);
- ExpressionContext bucketIndexReturningExpr = arguments.get(3);
- long firstBucketValue = arguments.get(4).getLiteral().getLongValue();
- long bucketWindowSeconds = arguments.get(5).getLiteral().getLongValue();
- int numBuckets = arguments.get(6).getLiteral().getIntValue();
- Map<String, String> aggParams =
AggInfo.deserializeParams(arguments.get(7).getLiteral().getStringValue());
+ ExpressionContext timeExpression = arguments.get(3);
+ TimeUnit timeUnit =
TimeUnit.valueOf(arguments.get(4).getLiteral().getStringValue().toUpperCase(Locale.ENGLISH));
+ long offsetSeconds = arguments.get(5).getLiteral().getLongValue();
+ long firstBucketValue = arguments.get(6).getLiteral().getLongValue();
+ long bucketWindowSeconds = arguments.get(7).getLiteral().getLongValue();
+ int numBuckets = arguments.get(8).getLiteral().getIntValue();
+ Map<String, String> aggParams =
AggInfo.deserializeParams(arguments.get(9).getLiteral().getStringValue());
AggInfo aggInfo = new AggInfo(aggFunctionName, true /* is partial agg */,
aggParams);
// Set all values
_factory =
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(language);
_valueExpression = valueExpression;
- _timeExpression = bucketIndexReturningExpr;
+ _timeExpression = timeExpression;
_timeBuckets = TimeBuckets.ofSeconds(firstBucketValue,
Duration.ofSeconds(bucketWindowSeconds), numBuckets);
_aggInfo = aggInfo;
+ _timeReferencePoint = timeUnit.convert(Duration.ofSeconds(firstBucketValue
- bucketWindowSeconds));
+ _timeOffset = timeUnit.convert(Duration.ofSeconds(offsetSeconds));
+ _timeBucketDivisor = timeUnit.convert(_timeBuckets.getBucketSize());
}
@Override
@@ -109,16 +144,16 @@ public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTi
@Override
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- int[] timeIndexes = blockValSetMap.get(_timeExpression).getIntValuesSV();
+ final long[] timeValues =
blockValSetMap.get(_timeExpression).getLongValuesSV();
BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression);
switch (valueBlockValSet.getValueType()) {
case DOUBLE:
case LONG:
case INT:
- aggregateNumericValues(length, timeIndexes, aggregationResultHolder,
valueBlockValSet);
+ aggregateNumericValues(length, timeValues, aggregationResultHolder,
valueBlockValSet);
break;
case STRING:
- aggregateStringValues(length, timeIndexes, aggregationResultHolder,
valueBlockValSet);
+ aggregateStringValues(length, timeValues, aggregationResultHolder,
valueBlockValSet);
break;
default:
throw new UnsupportedOperationException(String.format("Unsupported
type: %s in aggregate",
@@ -129,16 +164,16 @@ public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTi
@Override
public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- final int[] timeIndexes =
blockValSetMap.get(_timeExpression).getIntValuesSV();
+ final long[] timeValues =
blockValSetMap.get(_timeExpression).getLongValuesSV();
BlockValSet valueBlockValSet = blockValSetMap.get(_valueExpression);
switch (valueBlockValSet.getValueType()) {
case DOUBLE:
case LONG:
case INT:
- aggregateGroupByNumericValues(length, groupKeyArray, timeIndexes,
groupByResultHolder, valueBlockValSet);
+ aggregateGroupByNumericValues(length, groupKeyArray, timeValues,
groupByResultHolder, valueBlockValSet);
break;
case STRING:
- aggregateGroupByStringValues(length, groupKeyArray, timeIndexes,
groupByResultHolder, valueBlockValSet);
+ aggregateGroupByStringValues(length, groupKeyArray, timeValues,
groupByResultHolder, valueBlockValSet);
break;
default:
throw new UnsupportedOperationException(String.format("Unsupported
type: %s in aggregate",
@@ -189,7 +224,7 @@ public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTi
return "TIME_SERIES";
}
- private void aggregateNumericValues(int length, int[] timeIndexes,
AggregationResultHolder resultHolder,
+ private void aggregateNumericValues(int length, long[] timeValues,
AggregationResultHolder resultHolder,
BlockValSet blockValSet) {
double[] values = blockValSet.getDoubleValuesSV();
BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult();
@@ -198,12 +233,14 @@ public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTi
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
resultHolder.setValue(currentSeriesBuilder);
}
+ int timeIndex;
for (int docIndex = 0; docIndex < length; docIndex++) {
- currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex],
values[docIndex]);
+ timeIndex = (int) (((timeValues[docIndex] + _timeOffset) -
_timeReferencePoint - 1) / _timeBucketDivisor);
+ currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex],
timeValues[docIndex]);
}
}
- private void aggregateStringValues(int length, int[] timeIndexes,
AggregationResultHolder resultHolder,
+ private void aggregateStringValues(int length, long[] timeValues,
AggregationResultHolder resultHolder,
BlockValSet blockValSet) {
String[] values = blockValSet.getStringValuesSV();
BaseTimeSeriesBuilder currentSeriesBuilder = resultHolder.getResult();
@@ -212,14 +249,17 @@ public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTi
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
resultHolder.setValue(currentSeriesBuilder);
}
+ int timeIndex;
for (int docIndex = 0; docIndex < length; docIndex++) {
- currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex],
values[docIndex]);
+ timeIndex = (int) (((timeValues[docIndex] + _timeOffset) -
_timeReferencePoint - 1) / _timeBucketDivisor);
+ currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex],
timeValues[docIndex]);
}
}
- private void aggregateGroupByNumericValues(int length, int[] groupKeyArray,
int[] timeIndexes,
+ private void aggregateGroupByNumericValues(int length, int[] groupKeyArray,
long[] timeValues,
GroupByResultHolder resultHolder, BlockValSet blockValSet) {
final double[] values = blockValSet.getDoubleValuesSV();
+ int timeIndex;
for (int docIndex = 0; docIndex < length; docIndex++) {
int groupId = groupKeyArray[docIndex];
BaseTimeSeriesBuilder currentSeriesBuilder =
resultHolder.getResult(groupId);
@@ -228,13 +268,15 @@ public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTi
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
resultHolder.setValueForKey(groupId, currentSeriesBuilder);
}
- currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex],
values[docIndex]);
+ timeIndex = (int) (((timeValues[docIndex] + _timeOffset) -
_timeReferencePoint - 1) / _timeBucketDivisor);
+ currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex],
timeValues[docIndex]);
}
}
- private void aggregateGroupByStringValues(int length, int[] groupKeyArray,
int[] timeIndexes,
+ private void aggregateGroupByStringValues(int length, int[] groupKeyArray,
long[] timeValues,
GroupByResultHolder resultHolder, BlockValSet blockValSet) {
final String[] values = blockValSet.getStringValuesSV();
+ int timeIndex;
for (int docIndex = 0; docIndex < length; docIndex++) {
int groupId = groupKeyArray[docIndex];
BaseTimeSeriesBuilder currentSeriesBuilder =
resultHolder.getResult(groupId);
@@ -243,18 +285,21 @@ public class TimeSeriesAggregationFunction implements
AggregationFunction<BaseTi
BaseTimeSeriesBuilder.UNINITIALISED_TAG_NAMES,
BaseTimeSeriesBuilder.UNINITIALISED_TAG_VALUES);
resultHolder.setValueForKey(groupId, currentSeriesBuilder);
}
- currentSeriesBuilder.addValueAtIndex(timeIndexes[docIndex],
values[docIndex]);
+ timeIndex = (int) (((timeValues[docIndex] + _timeOffset) -
_timeReferencePoint - 1) / _timeBucketDivisor);
+ currentSeriesBuilder.addValueAtIndex(timeIndex, values[docIndex],
timeValues[docIndex]);
}
}
public static ExpressionContext create(String language, String
valueExpressionStr, ExpressionContext timeExpression,
- TimeBuckets timeBuckets, AggInfo aggInfo) {
+ TimeUnit timeUnit, long offsetSeconds, TimeBuckets timeBuckets, AggInfo
aggInfo) {
ExpressionContext valueExpression =
RequestContextUtils.getExpression(valueExpressionStr);
List<ExpressionContext> arguments = new ArrayList<>();
arguments.add(ExpressionContext.forLiteral(Literal.stringValue(language)));
arguments.add(ExpressionContext.forLiteral(Literal.stringValue(aggInfo.getAggFunction())));
arguments.add(valueExpression);
arguments.add(timeExpression);
+
arguments.add(ExpressionContext.forLiteral(Literal.stringValue(timeUnit.toString())));
+
arguments.add(ExpressionContext.forLiteral(Literal.longValue(offsetSeconds)));
arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getTimeBuckets()[0])));
arguments.add(ExpressionContext.forLiteral(Literal.longValue(timeBuckets.getBucketSize().getSeconds())));
arguments.add(ExpressionContext.forLiteral(Literal.intValue(timeBuckets.getNumBuckets())));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 04a908e697..095a1c0a89 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -47,7 +47,6 @@ import
org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.timeseries.TimeSeriesOperatorUtils;
-import
org.apache.pinot.core.operator.transform.function.TimeSeriesBucketTransformFunction;
import
org.apache.pinot.core.query.aggregation.function.TimeSeriesAggregationFunction;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -311,10 +310,9 @@ public class QueryExecutorTest {
AggInfo aggInfo, List<String> groupBy) {
List<ExpressionContext> groupByExpList =
groupBy.stream().map(RequestContextUtils::getExpression)
.collect(Collectors.toList());
- ExpressionContext timeExpression =
TimeSeriesBucketTransformFunction.create(TIME_SERIES_TIME_COL_NAME,
- TimeUnit.SECONDS, timeBuckets, offsetSeconds);
+ ExpressionContext timeExpression =
RequestContextUtils.getExpression(TIME_SERIES_TIME_COL_NAME);
ExpressionContext aggregateExpr =
TimeSeriesAggregationFunction.create(TIME_SERIES_LANGUAGE_NAME, valueExpression,
- timeExpression, timeBuckets, aggInfo);
+ timeExpression, TimeUnit.SECONDS, offsetSeconds, timeBuckets, aggInfo);
QueryContext.Builder builder = new QueryContext.Builder();
builder.setTableName(OFFLINE_TABLE_NAME);
builder.setAliasList(Collections.emptyList());
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
index 72857d403b..27918be35f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
@@ -30,7 +30,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
-import
org.apache.pinot.core.operator.transform.function.TimeSeriesBucketTransformFunction;
import
org.apache.pinot.core.query.aggregation.function.TimeSeriesAggregationFunction;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -106,10 +105,10 @@ public class PhysicalTimeSeriesServerPlanVisitor {
List<ExpressionContext> groupByExpressions =
leafNode.getGroupByExpressions().stream()
.map(RequestContextUtils::getExpression).collect(Collectors.toList());
TimeBuckets timeBuckets = context.getInitialTimeBuckets();
- ExpressionContext timeTransform =
TimeSeriesBucketTransformFunction.create(leafNode.getTimeColumn(),
- leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() ==
null ? 0 : leafNode.getOffsetSeconds());
+ ExpressionContext rawTimeValuesInLong =
RequestContextUtils.getExpression(leafNode.getTimeColumn());
ExpressionContext aggregation =
TimeSeriesAggregationFunction.create(context.getLanguage(),
- leafNode.getValueExpression(), timeTransform, timeBuckets,
leafNode.getAggInfo());
+ leafNode.getValueExpression(), rawTimeValuesInLong,
leafNode.getTimeUnit(),
+ leafNode.getOffsetSeconds() == null ? 0 : leafNode.getOffsetSeconds(),
timeBuckets, leafNode.getAggInfo());
Map<String, String> queryOptions = new
HashMap<>(leafNode.getQueryOptions());
queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(Math.max(0L,
context.getRemainingTimeMs())));
return new QueryContext.Builder()
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
index 84239e1217..0a5e4753fc 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
/**
@@ -57,10 +58,26 @@ public abstract class BaseTimeSeriesBuilder {
public abstract void addValueAtIndex(int timeBucketIndex, Double value);
+ /**
+ * This is the method called by Pinot's leaf stage to accumulate data in the
series builders. Pinot's leaf stage
+ * passes the raw time value to allow languages to build complex series
builders. For instance, PromQL relies on
+ * the first and last time value in each time bucket for certain functions.
+ * <p>
+ * The rawTimeValue is in the same Time Unit as that passed to the {@link
LeafTimeSeriesPlanNode}.
+ * </p>
+ */
+ public void addValueAtIndex(int timeBucketIndex, Double value, long
rawTimeValue) {
+ addValueAtIndex(timeBucketIndex, value);
+ }
+
public void addValueAtIndex(int timeBucketIndex, String value) {
throw new UnsupportedOperationException("This aggregation function does
not support string input");
}
+ public void addValueAtIndex(int timeBucketIndex, String value, long
rawTimeValue) {
+ addValueAtIndex(timeBucketIndex, value);
+ }
+
public abstract void addValue(long timeValue, Double value);
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]