This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 68899a30e4 [timeseries] Fix Time Series Query Correctness Issue
(#14251)
68899a30e4 is described below
commit 68899a30e4848d787c85b3ae966f73fb755182f4
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Oct 21 11:03:07 2024 -0500
[timeseries] Fix Time Series Query Correctness Issue (#14251)
---
.../timeseries/TimeSeriesAggregationOperator.java | 81 ++++++++++-----------
.../core/query/executor/QueryExecutorTest.java | 20 +++--
.../src/test/resources/data/sampleEatsData.avro | Bin 551915 -> 0 bytes
.../src/test/resources/data/sampleEatsData30k.avro | Bin 0 -> 327494 bytes
4 files changed, 53 insertions(+), 48 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index a47a05e83a..6202cf808c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -82,55 +82,54 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
@Override
protected TimeSeriesResultsBlock getNextBlock() {
- ValueBlock transformBlock = _projectOperator.nextBlock();
- if (transformBlock == null) {
- TimeSeriesBuilderBlock builderBlock = new
TimeSeriesBuilderBlock(_timeBuckets, new HashMap<>());
- return new TimeSeriesResultsBlock(builderBlock);
- }
- BlockValSet blockValSet = transformBlock.getBlockValueSet(_timeColumn);
- long[] timeValues = blockValSet.getLongValuesSV();
- if (_timeOffset != null && _timeOffset != 0L) {
- timeValues = applyTimeshift(_timeOffset, timeValues);
- }
- int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit);
- Object[][] tagValues = new Object[_groupByExpressions.size()][];
+ ValueBlock valueBlock;
Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024);
- for (int i = 0; i < _groupByExpressions.size(); i++) {
- blockValSet =
transformBlock.getBlockValueSet(_groupByExpressions.get(i));
- switch (blockValSet.getValueType()) {
- case JSON:
- case STRING:
- tagValues[i] = blockValSet.getStringValuesSV();
- break;
+ while ((valueBlock = _projectOperator.nextBlock()) != null) {
+ // TODO: This is quite unoptimized and allocates liberally
+ BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
+ long[] timeValues = blockValSet.getLongValuesSV();
+ if (_timeOffset != null && _timeOffset != 0L) {
+ timeValues = applyTimeshift(_timeOffset, timeValues);
+ }
+ int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit);
+ Object[][] tagValues = new Object[_groupByExpressions.size()][];
+ for (int i = 0; i < _groupByExpressions.size(); i++) {
+ blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i));
+ switch (blockValSet.getValueType()) {
+ case JSON:
+ case STRING:
+ tagValues[i] = blockValSet.getStringValuesSV();
+ break;
+ case LONG:
+ tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV());
+ break;
+ case INT:
+ tagValues[i] = ArrayUtils.toObject(blockValSet.getIntValuesSV());
+ break;
+ default:
+ throw new NotImplementedException("Can't handle types other than
string and long");
+ }
+ }
+ BlockValSet valueExpressionBlockValSet =
valueBlock.getBlockValueSet(_valueExpression);
+ switch (valueExpressionBlockValSet.getValueType()) {
case LONG:
- tagValues[i] = ArrayUtils.toObject(blockValSet.getLongValuesSV());
+ processLongExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues);
break;
case INT:
- tagValues[i] = ArrayUtils.toObject(blockValSet.getIntValuesSV());
+ processIntExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues);
+ break;
+ case DOUBLE:
+ processDoubleExpression(valueExpressionBlockValSet,
seriesBuilderMap, timeValueIndexes, tagValues);
+ break;
+ case STRING:
+ processStringExpression(valueExpressionBlockValSet,
seriesBuilderMap, timeValueIndexes, tagValues);
break;
default:
- throw new NotImplementedException("Can't handle types other than
string and long");
+ // TODO: Support other types?
+ throw new IllegalStateException(
+ "Don't yet support value expression of type: " +
valueExpressionBlockValSet.getValueType());
}
}
- BlockValSet valueExpressionBlockValSet =
transformBlock.getBlockValueSet(_valueExpression);
- switch (valueExpressionBlockValSet.getValueType()) {
- case LONG:
- processLongExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues);
- break;
- case INT:
- processIntExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues);
- break;
- case DOUBLE:
- processDoubleExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues);
- break;
- case STRING:
- processStringExpression(valueExpressionBlockValSet, seriesBuilderMap,
timeValueIndexes, tagValues);
- break;
- default:
- // TODO: Support other types?
- throw new IllegalStateException(
- "Don't yet support value expression of type: " +
valueExpressionBlockValSet.getValueType());
- }
return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets,
seriesBuilderMap));
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index f7f747a170..457e916b6d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -86,7 +86,7 @@ import static org.testng.Assert.assertTrue;
public class QueryExecutorTest {
- private static final String AVRO_DATA_PATH = "data/sampleEatsData.avro";
+ private static final String AVRO_DATA_PATH = "data/sampleEatsData30k.avro";
private static final String EMPTY_JSON_DATA_PATH =
"data/test_empty_data.json";
private static final String QUERY_EXECUTOR_CONFIG_PATH =
"conf/query-executor.properties";
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"QueryExecutorTest");
@@ -182,7 +182,7 @@ public class QueryExecutorTest {
instanceRequest.setSearchSegments(_segmentNames);
InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
- assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), 20000L);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), 60000L);
}
@Test
@@ -192,7 +192,7 @@ public class QueryExecutorTest {
instanceRequest.setSearchSegments(_segmentNames);
InstanceResponseBlock instanceResponse =
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof
AggregationResultsBlock);
- assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), 40102.0);
+ assertEquals(((AggregationResultsBlock)
instanceResponse.getResultsBlock()).getResults().get(0), 120306.0);
}
@Test
@@ -217,19 +217,20 @@ public class QueryExecutorTest {
@Test
public void testTimeSeriesSumQuery() {
- TimeBuckets timeBuckets =
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
+ TimeBuckets timeBuckets =
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 1);
ExpressionContext valueExpression =
ExpressionContext.forIdentifier("orderAmount");
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME,
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null));
- QueryContext queryContext =
getQueryContextForTimeSeries(timeSeriesContext);
+ QueryContext queryContext =
getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList());
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(),
ServerMetrics.get());
InstanceResponseBlock instanceResponse =
_queryExecutor.execute(serverQueryRequest, QUERY_RUNNERS);
assertTrue(instanceResponse.getResultsBlock() instanceof
TimeSeriesResultsBlock);
TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock)
instanceResponse.getResultsBlock();
TimeSeriesBlock timeSeriesBlock =
resultsBlock.getTimeSeriesBuilderBlock().build();
- assertEquals(5, timeSeriesBlock.getSeriesMap().size());
+ assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
+
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0],
29885544.0);
}
@Test
@@ -309,12 +310,17 @@ public class QueryExecutorTest {
}
private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context)
{
+ return getQueryContextForTimeSeries(context, Collections.singletonList(
+ ExpressionContext.forIdentifier("cityName")));
+ }
+
+ private QueryContext getQueryContextForTimeSeries(TimeSeriesContext context,
List<ExpressionContext> groupBy) {
QueryContext.Builder builder = new QueryContext.Builder();
builder.setTableName(OFFLINE_TABLE_NAME);
builder.setTimeSeriesContext(context);
builder.setAliasList(Collections.emptyList());
builder.setSelectExpressions(Collections.emptyList());
-
builder.setGroupByExpressions(Collections.singletonList(ExpressionContext.forIdentifier("cityName")));
+ builder.setGroupByExpressions(groupBy);
return builder.build();
}
}
diff --git a/pinot-core/src/test/resources/data/sampleEatsData.avro
b/pinot-core/src/test/resources/data/sampleEatsData.avro
deleted file mode 100644
index 0dcc7928ef..0000000000
Binary files a/pinot-core/src/test/resources/data/sampleEatsData.avro and
/dev/null differ
diff --git a/pinot-core/src/test/resources/data/sampleEatsData30k.avro
b/pinot-core/src/test/resources/data/sampleEatsData30k.avro
new file mode 100644
index 0000000000..e63388614c
Binary files /dev/null and
b/pinot-core/src/test/resources/data/sampleEatsData30k.avro differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]