This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 702dfc3692 [timeseries] Use Num Docs from Value Block in Time Series 
Agg (#14331)
702dfc3692 is described below

commit 702dfc3692b51422f26a40ef5b6700cf1fe9ca2c
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Oct 29 18:24:32 2024 -0500

    [timeseries] Use Num Docs from Value Block in Time Series Agg (#14331)
---
 .../timeseries/TimeSeriesAggregationOperator.java  | 49 +++++++++++-----------
 1 file changed, 25 insertions(+), 24 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index 6202cf808c..25ee168ef8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -85,13 +85,14 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
     ValueBlock valueBlock;
     Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024);
     while ((valueBlock = _projectOperator.nextBlock()) != null) {
+      int numDocs = valueBlock.getNumDocs();
       // TODO: This is quite unoptimized and allocates liberally
       BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
       long[] timeValues = blockValSet.getLongValuesSV();
       if (_timeOffset != null && _timeOffset != 0L) {
-        timeValues = applyTimeshift(_timeOffset, timeValues);
+        timeValues = applyTimeshift(_timeOffset, timeValues, numDocs);
       }
-      int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit);
+      int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit, 
numDocs);
       Object[][] tagValues = new Object[_groupByExpressions.size()][];
       for (int i = 0; i < _groupByExpressions.size(); i++) {
         blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i));
@@ -113,16 +114,16 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
       BlockValSet valueExpressionBlockValSet = 
valueBlock.getBlockValueSet(_valueExpression);
       switch (valueExpressionBlockValSet.getValueType()) {
         case LONG:
-          processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues);
+          processLongExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues, numDocs);
           break;
         case INT:
-          processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues);
+          processIntExpression(valueExpressionBlockValSet, seriesBuilderMap, 
timeValueIndexes, tagValues, numDocs);
           break;
         case DOUBLE:
-          processDoubleExpression(valueExpressionBlockValSet, 
seriesBuilderMap, timeValueIndexes, tagValues);
+          processDoubleExpression(valueExpressionBlockValSet, 
seriesBuilderMap, timeValueIndexes, tagValues, numDocs);
           break;
         case STRING:
-          processStringExpression(valueExpressionBlockValSet, 
seriesBuilderMap, timeValueIndexes, tagValues);
+          processStringExpression(valueExpressionBlockValSet, 
seriesBuilderMap, timeValueIndexes, tagValues, numDocs);
           break;
         default:
           // TODO: Support other types?
@@ -151,21 +152,21 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
     return new ExecutionStatistics(0, 0, 0, 0);
   }
 
-  private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit) {
+  private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit, 
int numDocs) {
     if (timeUnit == TimeUnit.MILLISECONDS) {
-      return getTimeValueIndexMillis(actualTimeValues);
+      return getTimeValueIndexMillis(actualTimeValues, numDocs);
     }
-    int[] timeIndexes = new int[actualTimeValues.length];
-    for (int index = 0; index < actualTimeValues.length; index++) {
+    int[] timeIndexes = new int[numDocs];
+    for (int index = 0; index < numDocs; index++) {
       timeIndexes[index] = (int) ((actualTimeValues[index] - 
_timeBuckets.getStartTime())
           / _timeBuckets.getBucketSize().getSeconds());
     }
     return timeIndexes;
   }
 
-  private int[] getTimeValueIndexMillis(long[] actualTimeValues) {
-    int[] timeIndexes = new int[actualTimeValues.length];
-    for (int index = 0; index < actualTimeValues.length; index++) {
+  private int[] getTimeValueIndexMillis(long[] actualTimeValues, int numDocs) {
+    int[] timeIndexes = new int[numDocs];
+    for (int index = 0; index < numDocs; index++) {
       timeIndexes[index] = (int) ((actualTimeValues[index] - 
_timeBuckets.getStartTime() * 1000L)
           / _timeBuckets.getBucketSize().toMillis());
     }
@@ -173,9 +174,9 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
   }
 
   public void processLongExpression(BlockValSet blockValSet, Map<Long, 
BaseTimeSeriesBuilder> seriesBuilderMap,
-      int[] timeValueIndexes, Object[][] tagValues) {
+      int[] timeValueIndexes, Object[][] tagValues, int numDocs) {
     long[] valueColumnValues = blockValSet.getLongValuesSV();
-    for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; 
docIdIndex++) {
+    for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) {
       Object[] tagValuesForDoc = new Object[_groupByExpressions.size()];
       for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) {
         tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex];
@@ -190,9 +191,9 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
   }
 
   public void processIntExpression(BlockValSet blockValSet, Map<Long, 
BaseTimeSeriesBuilder> seriesBuilderMap,
-      int[] timeValueIndexes, Object[][] tagValues) {
+      int[] timeValueIndexes, Object[][] tagValues, int numDocs) {
     int[] valueColumnValues = blockValSet.getIntValuesSV();
-    for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; 
docIdIndex++) {
+    for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) {
       Object[] tagValuesForDoc = new Object[_groupByExpressions.size()];
       for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) {
         tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex];
@@ -207,9 +208,9 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
   }
 
   public void processDoubleExpression(BlockValSet blockValSet, Map<Long, 
BaseTimeSeriesBuilder> seriesBuilderMap,
-      int[] timeValueIndexes, Object[][] tagValues) {
+      int[] timeValueIndexes, Object[][] tagValues, int numDocs) {
     double[] valueColumnValues = blockValSet.getDoubleValuesSV();
-    for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; 
docIdIndex++) {
+    for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) {
       Object[] tagValuesForDoc = new Object[_groupByExpressions.size()];
       for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) {
         tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex];
@@ -224,9 +225,9 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
   }
 
   public void processStringExpression(BlockValSet blockValSet, Map<Long, 
BaseTimeSeriesBuilder> seriesBuilderMap,
-      int[] timeValueIndexes, Object[][] tagValues) {
+      int[] timeValueIndexes, Object[][] tagValues, int numDocs) {
     String[] valueColumnValues = blockValSet.getStringValuesSV();
-    for (int docIdIndex = 0; docIdIndex < timeValueIndexes.length; 
docIdIndex++) {
+    for (int docIdIndex = 0; docIdIndex < numDocs; docIdIndex++) {
       Object[] tagValuesForDoc = new Object[_groupByExpressions.size()];
       for (int tagIndex = 0; tagIndex < tagValues.length; tagIndex++) {
         tagValuesForDoc[tagIndex] = tagValues[tagIndex][docIdIndex];
@@ -239,12 +240,12 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
     }
   }
 
-  public static long[] applyTimeshift(long timeshift, long[] timeValues) {
+  public static long[] applyTimeshift(long timeshift, long[] timeValues, int 
numDocs) {
     if (timeshift == 0) {
       return timeValues;
     }
-    long[] shiftedTimeValues = new long[timeValues.length];
-    for (int index = 0; index < timeValues.length; index++) {
+    long[] shiftedTimeValues = new long[numDocs];
+    for (int index = 0; index < numDocs; index++) {
       shiftedTimeValues[index] = timeValues[index] + timeshift;
     }
     return shiftedTimeValues;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to