This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4ba11e52a7 Part-5: Fix Offset Handling and Effective Time Filter +
Enable Group-By Expressions + Add Unit Tests and Minor Cleanup (#14104)
4ba11e52a7 is described below
commit 4ba11e52a76252082772529acb8fa37e2d12fb00
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Sep 30 12:57:13 2024 -0500
Part-5: Fix Offset Handling and Effective Time Filter + Enable Group-By
Expressions + Add Unit Tests and Minor Cleanup (#14104)
---
.../common/request/context/TimeSeriesContext.java | 10 +--
.../timeseries/TimeSeriesAggregationOperator.java | 5 +-
.../apache/pinot/core/plan/CombinePlanNode.java | 2 +-
.../apache/pinot/core/plan/TimeSeriesPlanNode.java | 2 +-
.../core/query/executor/QueryExecutorTest.java | 10 +--
.../timeseries/PhysicalTimeSeriesPlanVisitor.java | 28 ++++----
.../PhysicalTimeSeriesPlanVisitorTest.java | 80 ++++++++++++++++++++++
.../tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 32 ++++-----
.../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java | 78 +++++++++++++++++++++
.../spi/plan/serde/TimeSeriesPlanSerdeTest.java | 4 +-
10 files changed, 204 insertions(+), 47 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java
index 2290a617cc..ba7858ea11 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java
@@ -24,7 +24,7 @@ import org.apache.pinot.tsdb.spi.TimeBuckets;
public class TimeSeriesContext {
- private final String _engine;
+ private final String _language;
private final String _timeColumn;
private final TimeUnit _timeUnit;
private final TimeBuckets _timeBuckets;
@@ -32,9 +32,9 @@ public class TimeSeriesContext {
private final ExpressionContext _valueExpression;
private final AggInfo _aggInfo;
- public TimeSeriesContext(String engine, String timeColumn, TimeUnit
timeUnit, TimeBuckets timeBuckets,
+ public TimeSeriesContext(String language, String timeColumn, TimeUnit
timeUnit, TimeBuckets timeBuckets,
Long offsetSeconds, ExpressionContext valueExpression, AggInfo aggInfo) {
- _engine = engine;
+ _language = language;
_timeColumn = timeColumn;
_timeUnit = timeUnit;
_timeBuckets = timeBuckets;
@@ -43,8 +43,8 @@ public class TimeSeriesContext {
_aggInfo = aggInfo;
}
- public String getEngine() {
- return _engine;
+ public String getLanguage() {
+ return _language;
}
public String getTimeColumn() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index 93ef05949b..c67dbfe240 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.timeseries;
import com.google.common.collect.ImmutableList;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,7 +62,7 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
public TimeSeriesAggregationOperator(
String timeColumn,
TimeUnit timeUnit,
- Long timeOffset,
+ Long timeOffsetSeconds,
AggInfo aggInfo,
ExpressionContext valueExpression,
List<String> groupByExpressions,
@@ -70,7 +71,7 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
TimeSeriesBuilderFactory seriesBuilderFactory) {
_timeColumn = timeColumn;
_storedTimeUnit = timeUnit;
- _timeOffset = timeOffset;
+ _timeOffset = timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
_aggInfo = aggInfo;
_valueExpression = valueExpression;
_groupByExpressions = groupByExpressions;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 43f6df531e..26a9208225 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -127,7 +127,7 @@ public class CombinePlanNode implements PlanNode {
if (QueryContextUtils.isTimeSeriesQuery(_queryContext)) {
return new TimeSeriesCombineOperator(new TimeSeriesAggResultsBlockMerger(
-
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getEngine()),
+
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getLanguage()),
_queryContext.getTimeSeriesContext().getAggInfo()), operators,
_queryContext, _executorService);
} else if (_streamer != null
&& QueryContextUtils.isSelectionOnlyQuery(_queryContext) &&
_queryContext.getLimit() != 0) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
index b5e51e8e29..22e3d7b912 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
@@ -47,7 +47,7 @@ public class TimeSeriesPlanNode implements PlanNode {
_queryContext = queryContext;
_timeSeriesContext =
Objects.requireNonNull(queryContext.getTimeSeriesContext(),
"Missing time-series context in TimeSeriesPlanNode");
- _seriesBuilderFactory =
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getEngine());
+ _seriesBuilderFactory =
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getLanguage());
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index ff0e0ace8f..6ad0cc3fcb 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -95,7 +95,7 @@ public class QueryExecutorTest {
private static final int NUM_SEGMENTS_TO_GENERATE = 2;
private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2;
private static final ExecutorService QUERY_RUNNERS =
Executors.newFixedThreadPool(20);
- private static final String TIME_SERIES_ENGINE_NAME = "QueryExecutorTest";
+ private static final String TIME_SERIES_LANGUAGE_NAME = "QueryExecutorTest";
private static final String TIME_SERIES_TIME_COL_NAME =
"orderCreatedTimestamp";
private static final Long TIME_SERIES_TEST_START_TIME = 1726228400L;
@@ -171,7 +171,7 @@ public class QueryExecutorTest {
_queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
instanceDataManager, ServerMetrics.get());
// Setup time series builder factory
-
TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_ENGINE_NAME,
+
TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_LANGUAGE_NAME,
new SimpleTimeSeriesBuilderFactory());
}
@@ -219,7 +219,7 @@ public class QueryExecutorTest {
public void testTimeSeriesSumQuery() {
TimeBuckets timeBuckets =
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression =
ExpressionContext.forIdentifier("orderAmount");
- TimeSeriesContext timeSeriesContext = new
TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
+ TimeSeriesContext timeSeriesContext = new
TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */,
valueExpression, new AggInfo("SUM"));
QueryContext queryContext =
getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
@@ -235,7 +235,7 @@ public class QueryExecutorTest {
public void testTimeSeriesMaxQuery() {
TimeBuckets timeBuckets =
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression =
ExpressionContext.forIdentifier("orderItemCount");
- TimeSeriesContext timeSeriesContext = new
TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
+ TimeSeriesContext timeSeriesContext = new
TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */,
valueExpression, new AggInfo("MAX"));
QueryContext queryContext =
getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
@@ -267,7 +267,7 @@ public class QueryExecutorTest {
public void testTimeSeriesMinQuery() {
TimeBuckets timeBuckets =
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
ExpressionContext valueExpression =
ExpressionContext.forIdentifier("orderItemCount");
- TimeSeriesContext timeSeriesContext = new
TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
+ TimeSeriesContext timeSeriesContext = new
TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */,
valueExpression, new AggInfo("MIN"));
QueryContext queryContext =
getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
index 5e42d1b4b5..dc7c704f29 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
@@ -63,9 +63,9 @@ public class PhysicalTimeSeriesPlanVisitor {
for (int index = 0; index < planNode.getChildren().size(); index++) {
BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index);
if (childNode instanceof LeafTimeSeriesPlanNode) {
- LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) childNode;
- List<String> segments =
context.getPlanIdToSegmentsMap().get(sfpNode.getId());
- ServerQueryRequest serverQueryRequest =
compileLeafServerQueryRequest(sfpNode, segments, context);
+ LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) childNode;
+ List<String> segments =
context.getPlanIdToSegmentsMap().get(leafNode.getId());
+ ServerQueryRequest serverQueryRequest =
compileLeafServerQueryRequest(leafNode, segments, context);
TimeSeriesPhysicalTableScan physicalTableScan = new
TimeSeriesPhysicalTableScan(childNode.getId(),
serverQueryRequest, _queryExecutor, _executorService);
planNode.getChildren().set(index, physicalTableScan);
@@ -75,26 +75,24 @@ public class PhysicalTimeSeriesPlanVisitor {
}
}
- public ServerQueryRequest
compileLeafServerQueryRequest(LeafTimeSeriesPlanNode sfpNode, List<String>
segments,
+ public ServerQueryRequest
compileLeafServerQueryRequest(LeafTimeSeriesPlanNode leafNode, List<String>
segments,
TimeSeriesExecutionContext context) {
- return new ServerQueryRequest(compileQueryContext(sfpNode, context),
+ return new ServerQueryRequest(compileQueryContext(leafNode, context),
segments, /* TODO: Pass metadata from request */
Collections.emptyMap(), _serverMetrics);
}
- public QueryContext compileQueryContext(LeafTimeSeriesPlanNode sfpNode,
TimeSeriesExecutionContext context) {
+ public QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode,
TimeSeriesExecutionContext context) {
FilterContext filterContext =
RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(
- sfpNode.getEffectiveFilter(context.getInitialTimeBuckets())));
- List<ExpressionContext> groupByExpressions =
sfpNode.getGroupByColumns().stream()
- .map(ExpressionContext::forIdentifier).collect(Collectors.toList());
- ExpressionContext valueExpression =
RequestContextUtils.getExpression(sfpNode.getValueExpression());
+ leafNode.getEffectiveFilter(context.getInitialTimeBuckets())));
+ List<ExpressionContext> groupByExpressions =
leafNode.getGroupByExpressions().stream()
+ .map(RequestContextUtils::getExpression).collect(Collectors.toList());
+ ExpressionContext valueExpression =
RequestContextUtils.getExpression(leafNode.getValueExpression());
TimeSeriesContext timeSeriesContext = new
TimeSeriesContext(context.getLanguage(),
- sfpNode.getTimeColumn(),
- sfpNode.getTimeUnit(), context.getInitialTimeBuckets(),
sfpNode.getOffset(),
- valueExpression,
- sfpNode.getAggInfo());
+ leafNode.getTimeColumn(), leafNode.getTimeUnit(),
context.getInitialTimeBuckets(), leafNode.getOffsetSeconds(),
+ valueExpression, leafNode.getAggInfo());
return new QueryContext.Builder()
- .setTableName(sfpNode.getTableName())
+ .setTableName(leafNode.getTableName())
.setFilter(filterContext)
.setGroupByExpressions(groupByExpressions)
.setSelectExpressions(Collections.emptyList())
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
new file mode 100644
index 0000000000..81b03fa131
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class PhysicalTimeSeriesPlanVisitorTest {
+ @Test
+ public void testCompileQueryContext() {
+ final String planId = "id";
+ final String tableName = "orderTable";
+ final String timeColumn = "orderTime";
+ final AggInfo aggInfo = new AggInfo("SUM");
+ final String filterExpr = "cityName = 'Chicago'";
+ // Case-1: Without offset, simple column based group-by expression, simple
column based value, and non-empty filter.
+ {
+ TimeSeriesExecutionContext context =
+ new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L,
Duration.ofSeconds(10), 100),
+ Collections.emptyMap());
+ LeafTimeSeriesPlanNode leafNode =
+ new LeafTimeSeriesPlanNode(planId, Collections.emptyList(),
tableName, timeColumn, TimeUnit.SECONDS, 0L,
+ filterExpr, "orderCount", aggInfo,
Collections.singletonList("cityName"));
+ QueryContext queryContext =
PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context);
+ assertNotNull(queryContext.getTimeSeriesContext());
+ assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql");
+ assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 0L);
+ assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(),
timeColumn);
+
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(),
"orderCount");
+ assertEquals(queryContext.getFilter().toString(),
+ "(cityName = 'Chicago' AND orderTime >= '1000' AND orderTime <=
'2000')");
+ }
+ // Case-2: With offset, complex group-by expression, complex value, and
non-empty filter
+ {
+ TimeSeriesExecutionContext context =
+ new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L,
Duration.ofSeconds(10), 100),
+ Collections.emptyMap());
+ LeafTimeSeriesPlanNode leafNode =
+ new LeafTimeSeriesPlanNode(planId, Collections.emptyList(),
tableName, timeColumn, TimeUnit.SECONDS, 10L,
+ filterExpr, "orderCount*2", aggInfo,
Collections.singletonList("concat(cityName, stateName, '-')"));
+ QueryContext queryContext =
PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context);
+ assertNotNull(queryContext);
+ assertNotNull(queryContext.getGroupByExpressions());
+ assertEquals("concat(cityName,stateName,'-')",
queryContext.getGroupByExpressions().get(0).toString());
+ assertNotNull(queryContext.getTimeSeriesContext());
+ assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql");
+ assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(),
10L);
+ assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(),
timeColumn);
+
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(),
"times(orderCount,'2')");
+ assertNotNull(queryContext.getFilter());
+ assertEquals(queryContext.getFilter().toString(),
+ "(cityName = 'Chicago' AND orderTime >= '990' AND orderTime <=
'1990')");
+ }
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index 18d6316776..c5f438596c 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -40,30 +40,29 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
private final String _tableName;
private final String _timeColumn;
private final TimeUnit _timeUnit;
- private final Long _offset;
+ private final Long _offsetSeconds;
private final String _filterExpression;
private final String _valueExpression;
private final AggInfo _aggInfo;
- private final List<String> _groupByColumns;
+ private final List<String> _groupByExpressions;
@JsonCreator
public LeafTimeSeriesPlanNode(
@JsonProperty("id") String id, @JsonProperty("children")
List<BaseTimeSeriesPlanNode> children,
@JsonProperty("tableName") String tableName, @JsonProperty("timeColumn")
String timeColumn,
- @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset")
Long offset,
+ @JsonProperty("timeUnit") TimeUnit timeUnit,
@JsonProperty("offsetSeconds") Long offsetSeconds,
@JsonProperty("filterExpression") String filterExpression,
- @JsonProperty("valueExpression") String valueExpression,
- @JsonProperty("aggInfo") AggInfo aggInfo,
@JsonProperty("groupByColumns") List<String> groupByColumns) {
+ @JsonProperty("valueExpression") String valueExpression,
@JsonProperty("aggInfo") AggInfo aggInfo,
+ @JsonProperty("groupByExpressions") List<String> groupByExpressions) {
super(id, children);
_tableName = tableName;
_timeColumn = timeColumn;
_timeUnit = timeUnit;
- // TODO: This is broken technically. Adjust offset to meet TimeUnit
resolution. For now use 0 offset.
- _offset = offset;
+ _offsetSeconds = offsetSeconds;
_filterExpression = filterExpression;
_valueExpression = valueExpression;
_aggInfo = aggInfo;
- _groupByColumns = groupByColumns;
+ _groupByExpressions = groupByExpressions;
}
@Override
@@ -78,7 +77,7 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
@Override
public BaseTimeSeriesOperator run() {
- throw new UnsupportedOperationException("");
+ throw new UnsupportedOperationException("Leaf plan node is replaced with a
physical plan node at runtime");
}
public String getTableName() {
@@ -93,8 +92,8 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
return _timeUnit;
}
- public Long getOffset() {
- return _offset;
+ public Long getOffsetSeconds() {
+ return _offsetSeconds;
}
public String getFilterExpression() {
@@ -109,15 +108,16 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
return _aggInfo;
}
- public List<String> getGroupByColumns() {
- return _groupByColumns;
+ public List<String> getGroupByExpressions() {
+ return _groupByExpressions;
}
public String getEffectiveFilter(TimeBuckets timeBuckets) {
String filter = _filterExpression == null ? "" : _filterExpression;
- // TODO: This is wrong. offset should be converted to seconds before
arithmetic. For now use 0 offset.
- long startTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offset));
- long endTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getEndTime() - _offset));
+ long startTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() -
_offsetSeconds));
+ long endTime =
+ _timeUnit.convert(Duration.ofSeconds(
+ timeBuckets.getEndTime() + timeBuckets.getBucketSize().toSeconds()
- _offsetSeconds));
String addnFilter = String.format("%s >= %d AND %s <= %d", _timeColumn,
startTime, _timeColumn, endTime);
if (filter.strip().isEmpty()) {
return addnFilter;
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
new file mode 100644
index 0000000000..82694e19a0
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class LeafTimeSeriesPlanNodeTest {
+ private static final String ID = "plan_id123";
+ private static final String TABLE = "myTable";
+ private static final String TIME_COLUMN = "orderTime";
+ private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+
+ @Test
+ public void testGetEffectiveFilter() {
+ TimeBuckets timeBuckets = TimeBuckets.ofSeconds(1000,
Duration.ofSeconds(13), 9);
+ final long expectedStartTimeInFilter = 1000;
+ final long expectedEndTimeInFilter = 1000 + 13 * 9;
+ final String nonEmptyFilter = "cityName = 'Chicago'";
+ // Case-1: No offset, and empty filter.
+ {
+ LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID,
Collections.emptyList(), TABLE, TIME_COLUMN,
+ TIME_UNIT, 0L, "", "value_col", new AggInfo("SUM"),
+ Collections.singletonList("cityName"));
+ assertEquals(planNode.getEffectiveFilter(timeBuckets),
+ "orderTime >= " + expectedStartTimeInFilter + " AND orderTime <= " +
expectedEndTimeInFilter);
+ }
+ // Case-2: Offset, but empty filter
+ {
+ LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID,
Collections.emptyList(), TABLE, TIME_COLUMN,
+ TIME_UNIT, 123L, "", "value_col", new AggInfo("SUM"),
+ Collections.singletonList("cityName"));
+ assertEquals(planNode.getEffectiveFilter(timeBuckets),
+ "orderTime >= " + (expectedStartTimeInFilter - 123) + " AND
orderTime <= " + (expectedEndTimeInFilter - 123));
+ }
+ // Case-3: Offset and non-empty filter
+ {
+ LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID,
Collections.emptyList(), TABLE, TIME_COLUMN,
+ TIME_UNIT, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM"),
+ Collections.singletonList("cityName"));
+ assertEquals(planNode.getEffectiveFilter(timeBuckets),
+ String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)",
nonEmptyFilter,
+ (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter -
123)));
+ }
+ // Case-4: Offset, and non-empty filter, and time-unit that is not seconds
+ {
+ LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID,
Collections.emptyList(), TABLE, TIME_COLUMN,
+ TimeUnit.MILLISECONDS, 123L, nonEmptyFilter, "value_col", new
AggInfo("SUM"),
+ Collections.singletonList("cityName"));
+ assertEquals(planNode.getEffectiveFilter(timeBuckets),
+ String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)",
nonEmptyFilter,
+ (expectedStartTimeInFilter * 1000 - 123 * 1000),
(expectedEndTimeInFilter * 1000 - 123 * 1000)));
+ }
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
index a5015dc991..df66ea8fd9 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -43,10 +43,10 @@ public class TimeSeriesPlanSerdeTest {
assertEquals(deserializedNode.getTableName(), "myTable");
assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn");
assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS);
- assertEquals(deserializedNode.getOffset(), 0L);
+ assertEquals(deserializedNode.getOffsetSeconds(), 0L);
assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression");
assertEquals(deserializedNode.getValueExpression(), "myValueExpression");
assertNotNull(deserializedNode.getAggInfo());
- assertEquals(deserializedNode.getGroupByColumns().size(), 0);
+ assertEquals(deserializedNode.getGroupByExpressions().size(), 0);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]