This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 702dfc3692 [timeseries] Use Num Docs from Value Block in Time Series
Agg (#14331)
702dfc3692 is described below
commit 702dfc3692b51422f26a40ef5b6700cf1fe9ca2c
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Oct 29 18:24:32 2024 -0500
[timeseries] Use Num Docs from Value Block in Time Series Agg (#14331)
---
.../timeseries/TimeSeriesAggregationOperator.java | 49 +++++++++++-----------
1 file changed, 25 insertions(+), 24 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index 6202cf808c..25ee168ef8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -85,13 +85,14 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
ValueBlock valueBlock;
Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024);
while ((valueBlock = _projectOperator.nextBlock()) != null) {
+ int numDocs = valueBlock.getNumDocs();
// TODO: This is quite unoptimized and allocates liberally
BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
long[] timeValues = blockValSet.getLongValuesSV();
if (_timeOffset != null && _timeOffset != 0L) {
- timeValues = applyTimeshift(_timeOffset, timeValues);
+ timeValues = applyTimeshift(_timeOffset, timeValues, numDocs);
}
- int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit);
+ int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit,
numDocs);
Object[][] tagValues = new Object[_groupByExpressions.size()][];
for (int i = 0; i < _groupByExpressions.size(); i++) {
blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i));
@@ -113,16 +114,16 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
BlockValSet valueExpressionBlockValSet =
valueBlock.getBlockValueSet(_valueExpression);
switch (valueExpressionBlockValSet.getValueType()) {
case LONG:
- processLongExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues);
+ processLongExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues, numDocs);
break;
case INT:
- processIntExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues);
+ processIntExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues, numDocs);
break;
case DOUBLE:
- processDoubleExpression(valueExpressionBlockValSet,
seriesBuilderMap, timeValueIndexes, tagValues);
+ processDoubleExpression(valueExpressionBlockValSet,
seriesBuilderMap, timeValueIndexes, tagValues, numDocs);
break;
case STRING:
- processStringExpression(valueExpressionBlockValSet,
seriesBuilderMap, timeValueIndexes, tagValues);
+ processStringExpression(valueExpressionBlockValSet,
seriesBuilderMap, timeValueIndexes, tagValues, numDocs);
break;
default:
// TODO: Support other types?
@@ -151,21 +152,21 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
return new ExecutionStatistics(0, 0, 0, 0);
}
- private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit) {
+ private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit,
int numDocs) {
if (timeUnit == TimeUnit.MILLISECONDS) {
- return getTimeValueIndexMillis(actualTimeValues);
+ return getTimeValueIndexMillis(actualTimeValues, numDocs);
}
- int[] timeIndexes = new int[actualTimeValues.length];
- for (int index = 0; index < actualTimeValues.length; index++) {
+ int[] timeIndexes = new int[numDocs];
+ for (int index = 0; index < numDocs; index++) {
timeIndexes[index] = (int) ((actualTimeValues[index] -
_timeBuckets.getStartTime())
/ _timeBuckets.getBucketSize().getSeconds());
}
return timeIndexes;
}
- private int[] getTimeValueIndexMillis(long[] actualTimeValues) {
- int[] timeIndexes = new int[actualTimeValues.length];
- for (int index = 0; index < actualTimeValues.length; index++) {
+ private int[] getTimeValueIndexMillis(long[] actualTimeValues, int numDocs) {
+ int[] timeIndexes = new int[numDocs];
+ for (int index = 0; index < numDocs; index++) {
timeIndexes[index] = (int) ((actualTimeValues[index] -
_timeBuckets.getStartTime() * 1000L)
/ _timeBuckets.getBucketSize().toMillis());
}
@@ -173,9 +174,9 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
}
public void processLongExpression(BlockValSet blockValSet, Map<Long,
BaseTimeSeriesBuilder> seriesBuilderMap,
- int[] timeValueIndexes, Object[][] tagValues) {
+ int[] timeValueIndexes, Object[][] tagValues, int numDocs) {
long[] valueColumnValues = blockValSet.getLongValuesSV();
- for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length;
docIdIndex++) {
+ for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) {
Object[] tagValuesForDoc = new Object[_groupByExpressions.size()];
for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) {
tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex];
@@ -190,9 +191,9 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
}
public void processIntExpression(BlockValSet blockValSet, Map<Long,
BaseTimeSeriesBuilder> seriesBuilderMap,
- int[] timeValueIndexes, Object[][] tagValues) {
+ int[] timeValueIndexes, Object[][] tagValues, int numDocs) {
int[] valueColumnValues = blockValSet.getIntValuesSV();
- for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length;
docIdIndex++) {
+ for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) {
Object[] tagValuesForDoc = new Object[_groupByExpressions.size()];
for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) {
tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex];
@@ -207,9 +208,9 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
}
public void processDoubleExpression(BlockValSet blockValSet, Map<Long,
BaseTimeSeriesBuilder> seriesBuilderMap,
- int[] timeValueIndexes, Object[][] tagValues) {
+ int[] timeValueIndexes, Object[][] tagValues, int numDocs) {
double[] valueColumnValues = blockValSet.getDoubleValuesSV();
- for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length;
docIdIndex++) {
+ for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) {
Object[] tagValuesForDoc = new Object[_groupByExpressions.size()];
for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) {
tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex];
@@ -224,9 +225,9 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
}
public void processStringExpression(BlockValSet blockValSet, Map<Long,
BaseTimeSeriesBuilder> seriesBuilderMap,
- int[] timeValueIndexes, Object[][] tagValues) {
+ int[] timeValueIndexes, Object[][] tagValues, int numDocs) {
String[] valueColumnValues = blockValSet.getStringValuesSV();
- for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length;
docIdIndex++) {
+ for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) {
Object[] tagValuesForDoc = new Object[_groupByExpressions.size()];
for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) {
tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex];
@@ -239,12 +240,12 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
}
}
- public static long[] applyTimeshift(long timeshift, long[] timeValues) {
+ public static long[] applyTimeshift(long timeshift, long[] timeValues, int
numDocs) {
if (timeshift == 0) {
return timeValues;
}
- long[] shiftedTimeValues = new long[timeValues.length];
- for (int index = 0; index < timeValues.length; index++) {
+ long[] shiftedTimeValues = new long[numDocs];
+ for (int index = 0; index < numDocs; index++) {
shiftedTimeValues[index] = timeValues[index] + timeshift;
}
return shiftedTimeValues;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]