This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 63c1746 Fix timeseries query constructor when postAggregator has an
expression reading timestamp result column (#10198)
63c1746 is described below
commit 63c1746fe40fc40d60dbc77f411e58949c102bfa
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 27 10:54:44 2020 -0700
Fix timeseries query constructor when postAggregator has an expression
reading timestamp result column (#10198)
* Fix timeseries query constructor when postAggregator has an expression
reading timestamp result column
* fix npe
* Fix postAgg referencing timestampResultField and add a test for it
* fix test
* doc
* revert doc
---
.../druid/query/timeseries/TimeseriesQuery.java | 5 +-
.../timeseries/TimeseriesQueryQueryToolChest.java | 16 ++--
.../groupby/GroupByTimeseriesQueryRunnerTest.java | 55 +++++++++++--
.../timeseries/TimeseriesQueryRunnerTest.java | 89 ++++++++++++++++++----
.../apache/druid/sql/calcite/CalciteQueryTest.java | 46 +++++++++++
5 files changed, 183 insertions(+), 28 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
index 6603ee0..4756707 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
@@ -78,11 +78,14 @@ public class TimeseriesQuery extends
BaseQuery<Result<TimeseriesResultValue>>
{
super(dataSource, querySegmentSpec, descending, context, granularity);
+ // The below should be executed after context is initialized.
+ final String timestampField = getTimestampResultField();
+
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.dimFilter = dimFilter;
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() :
aggregatorSpecs;
this.postAggregatorSpecs = Queries.prepareAggregations(
- ImmutableList.of(),
+ timestampField == null ? ImmutableList.of() :
ImmutableList.of(timestampField),
this.aggregatorSpecs,
postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs
);
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 9b4c94b..d811914 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -412,7 +412,6 @@ public class TimeseriesQueryQueryToolChest extends
QueryToolChest<Result<Timeser
@Override
public RowSignature resultArraySignature(TimeseriesQuery query)
{
-
RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
rowSignatureBuilder.addTimeColumn();
if (StringUtils.isNotEmpty(query.getTimestampResultField())) {
@@ -460,6 +459,14 @@ public class TimeseriesQueryQueryToolChest extends
QueryToolChest<Result<Timeser
final TimeseriesResultValue holder = result.getValue();
final Map<String, Object> values = new HashMap<>(holder.getBaseObject());
if (calculatePostAggs) {
+ // If "timestampResultField" is set, we must include a copy of the
timestamp in the result.
+ // This is used by the SQL layer when it generates a Timeseries query
for a group-by-time-floor SQL query.
+ // The SQL layer expects the result of the time-floor to have a
specific name that is not going to be "__time".
+ // This should be done before computing post aggregators since they
can reference "timestampResultField".
+ if (StringUtils.isNotEmpty(query.getTimestampResultField()) &&
result.getTimestamp() != null) {
+ final DateTime timestamp = result.getTimestamp();
+ values.put(query.getTimestampResultField(), timestamp.getMillis());
+ }
if (!query.getPostAggregatorSpecs().isEmpty()) {
// put non finalized aggregators for calculating dependent post
Aggregators
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
@@ -469,13 +476,6 @@ public class TimeseriesQueryQueryToolChest extends
QueryToolChest<Result<Timeser
values.put(postAgg.getName(), postAgg.compute(values));
}
}
- // If "timestampResultField" is set, we must include a copy of the
timestamp in the result.
- // This is used by the SQL layer when it generates a Timeseries query
for a group-by-time-floor SQL query.
- // The SQL layer expects the result of the time-floor to have a
specific name that is not going to be "__time".
- if (StringUtils.isNotEmpty(query.getTimestampResultField()) &&
result.getTimestamp() != null) {
- final DateTime timestamp = result.getTimestamp();
- values.put(query.getTimestampResultField(), timestamp.getMillis());
- }
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg,
holder.getMetric(agg.getName())));
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
index 9e7b45a..a189e6a 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
@@ -40,9 +41,15 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerTest;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -52,10 +59,11 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/**
- *
+ * This class is for testing both timeseries and groupBy queries with the same
set of queries.
*/
@RunWith(Parameterized.class)
public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
@@ -99,15 +107,36 @@ public class GroupByTimeseriesQueryRunnerTest extends
TimeseriesQueryRunnerTest
toolChest
);
+ final String timeDimension = tsQuery.getTimestampResultField();
+ final List<VirtualColumn> virtualColumns = new ArrayList<>(
+ Arrays.asList(tsQuery.getVirtualColumns().getVirtualColumns())
+ );
+ if (timeDimension != null) {
+ final PeriodGranularity granularity = (PeriodGranularity)
tsQuery.getGranularity();
+ virtualColumns.add(
+ new ExpressionVirtualColumn(
+ "v0",
+ StringUtils.format("timestamp_floor(__time, '%s')",
granularity.getPeriod()),
+ ValueType.LONG,
+ TestExprMacroTable.INSTANCE
+ )
+ );
+ }
+
GroupByQuery newQuery = GroupByQuery
.builder()
.setDataSource(tsQuery.getDataSource())
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
.setGranularity(tsQuery.getGranularity())
.setDimFilter(tsQuery.getDimensionsFilter())
+ .setDimensions(
+ timeDimension == null
+ ? ImmutableList.of()
+ : ImmutableList.of(new DefaultDimensionSpec("v0",
timeDimension, ValueType.LONG))
+ )
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
- .setVirtualColumns(tsQuery.getVirtualColumns())
+ .setVirtualColumns(VirtualColumns.create(virtualColumns))
.setContext(tsQuery.getContext())
.build();
@@ -239,14 +268,28 @@ public class GroupByTimeseriesQueryRunnerTest extends
TimeseriesQueryRunnerTest
@Override
public void testTimeseriesWithTimestampResultFieldContextForArrayResponse()
{
- // Skip this test because the timeseries test expects an extra column to
be created (map from the timestamp_floor
- // of the timestamp dimension) but group by doesn't do this.
+ // Cannot vectorize with an expression virtual column
+ if (!vectorize) {
+ super.testTimeseriesWithTimestampResultFieldContextForArrayResponse();
+ }
}
@Override
public void testTimeseriesWithTimestampResultFieldContextForMapResponse()
{
- // Skip this test because the timeseries test expects an extra column to
be created (map from the timestamp_floor
- // of the timestamp dimension) but group by doesn't do this.
+ // Cannot vectorize with an expression virtual column
+ if (!vectorize) {
+ super.testTimeseriesWithTimestampResultFieldContextForMapResponse();
+ }
+ }
+
+ @Override
+ @Test
+ public void testTimeseriesWithPostAggregatorReferencingTimestampResultField()
+ {
+ // Cannot vectorize with an expression virtual column
+ if (!vectorize) {
+ super.testTimeseriesWithPostAggregatorReferencingTimestampResultField();
+ }
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
index 99989c0..4579028 100644
---
a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
@@ -47,6 +47,7 @@ import
org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.filter.AndDimFilter;
@@ -1619,7 +1620,6 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(aggregatorFactoryList)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
- .context(ImmutableMap.of("skipEmptyBuckets",
"true"))
.descending(descending)
.context(makeContext(ImmutableMap.of("skipEmptyBuckets", "true")))
.build();
@@ -2489,9 +2489,14 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.descending(descending)
- .context(ImmutableMap.of(
-
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME
- ))
+ .context(
+ makeContext(
+ ImmutableMap.of(
+
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME,
+ "skipEmptyBuckets", true
+ )
+ )
+ )
.build();
Assert.assertEquals(TIMESTAMP_RESULT_FIELD_NAME,
query.getTimestampResultField());
@@ -2518,6 +2523,9 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
final String[] expectedIndex = descending ?
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC :
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES;
+ final String[] expectedIndexToUse = Arrays.stream(expectedIndex)
+ .filter(eachIndex ->
!"0.0".equals(eachIndex))
+ .toArray(String[]::new);
final Long expectedLast = descending ?
QueryRunnerTestHelper.EARLIEST.getMillis() :
@@ -2545,7 +2553,7 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
if (QueryRunnerTestHelper.SKIPPED_DAY.getMillis() != current) {
Assert.assertEquals(
- Doubles.tryParse(expectedIndex[count]).doubleValue(),
+ Doubles.tryParse(expectedIndexToUse[count]).doubleValue(),
(Double) result[3],
(Double) result[3] * 1e-6
);
@@ -2555,7 +2563,7 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
0.02
);
Assert.assertEquals(
- new Double(expectedIndex[count]) + 13L + 1L,
+ new Double(expectedIndexToUse[count]) + 13L + 1L,
(Double) result[5],
(Double) result[5] * 1e-6
);
@@ -2572,7 +2580,7 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
0.02
);
Assert.assertEquals(
- new Double(expectedIndex[count]) + 1L,
+ new Double(expectedIndexToUse[count]) + 1L,
(Double) result[5],
(Double) result[5] * 1e-6
);
@@ -2612,9 +2620,14 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.descending(descending)
- .context(ImmutableMap.of(
-
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME
- ))
+ .context(
+ makeContext(
+ ImmutableMap.of(
+
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME,
+ "skipEmptyBuckets", true
+ )
+ )
+ )
.build();
Assert.assertEquals(TIMESTAMP_RESULT_FIELD_NAME,
query.getTimestampResultField());
@@ -2624,6 +2637,9 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
final String[] expectedIndex = descending ?
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC :
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES;
+ final String[] expectedIndexToUse = Arrays.stream(expectedIndex)
+ .filter(eachIndex ->
!"0.0".equals(eachIndex))
+ .toArray(String[]::new);
final DateTime expectedLast = descending ?
QueryRunnerTestHelper.EARLIEST :
@@ -2655,13 +2671,13 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
if (!QueryRunnerTestHelper.SKIPPED_DAY.equals(current)) {
Assert.assertEquals(
result.toString(),
- Doubles.tryParse(expectedIndex[count]).doubleValue(),
+ Doubles.tryParse(expectedIndexToUse[count]).doubleValue(),
value.getDoubleMetric("index").doubleValue(),
value.getDoubleMetric("index").doubleValue() * 1e-6
);
Assert.assertEquals(
result.toString(),
- new Double(expectedIndex[count]) +
+ new Double(expectedIndexToUse[count]) +
13L + 1L,
value.getDoubleMetric("addRowsIndexConstant"),
value.getDoubleMetric("addRowsIndexConstant") * 1e-6
@@ -2681,7 +2697,7 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
);
Assert.assertEquals(
result.toString(),
- new Double(expectedIndex[count]) + 1L,
+ new Double(expectedIndexToUse[count]) + 1L,
value.getDoubleMetric("addRowsIndexConstant"),
value.getDoubleMetric("addRowsIndexConstant") * 1e-6
);
@@ -2811,6 +2827,53 @@ public class TimeseriesQueryRunnerTest extends
InitializedNullHandlingTest
Assert.assertEquals(10, list.size());
}
+ @Test
+ public void testTimeseriesWithPostAggregatorReferencingTimestampResultField()
+ {
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .granularity(QueryRunnerTestHelper.DAY_GRAN)
+
.filters(QueryRunnerTestHelper.MARKET_DIMENSION, "spot")
+
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .postAggregators(
+ new
FieldAccessPostAggregator("timestampInPostAgg", "myTimestamp")
+ )
+ .descending(descending)
+ .context(
+ makeContext(
+
ImmutableMap.of(TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, "myTimestamp")
+ )
+ )
+ .build();
+
+ final DateTime aprilFirst = DateTimes.of("2011-04-01");
+ final DateTime aprilSecond = DateTimes.of("2011-04-02");
+
+ List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
+ new Result<>(
+ aprilFirst,
+ new TimeseriesResultValue(
+ ImmutableMap.of(
+ "myTimestamp", aprilFirst.getMillis(),
+ "timestampInPostAgg", aprilFirst.getMillis()
+ )
+ )
+ ),
+ new Result<>(
+ aprilSecond,
+ new TimeseriesResultValue(
+ ImmutableMap.of(
+ "myTimestamp", aprilSecond.getMillis(),
+ "timestampInPostAgg", aprilSecond.getMillis()
+ )
+ )
+ )
+ );
+
+ Iterable<Result<TimeseriesResultValue>> results =
runner.run(QueryPlus.wrap(query)).toList();
+ assertExpectedResults(expectedResults, results);
+ }
+
private Map<String, Object> makeContext()
{
return makeContext(ImmutableMap.of());
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index deffe20..e8fda2a 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -204,6 +204,52 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
}
@Test
+ public void
testGroupByWithPostAggregatorReferencingTimeFloorColumnOnTimeseries() throws
Exception
+ {
+ cannotVectorize();
+
+ testQuery(
+ "SELECT TIME_FORMAT(\"date\", 'yyyy-MM'), SUM(x)\n"
+ + "FROM (\n"
+ + " SELECT\n"
+ + " FLOOR(__time to hour) as \"date\",\n"
+ + " COUNT(*) as x\n"
+ + " FROM foo\n"
+ + " GROUP BY 1\n"
+ + ")\n"
+ + "GROUP BY 1",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+
.intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.HOUR)
+ .aggregators(aggregators(new
CountAggregatorFactory("a0")))
+
.context(getTimeseriesContextWithFloorTime(TIMESERIES_CONTEXT_DEFAULT, "d0"))
+ .build()
+ )
+ .setInterval(querySegmentSpec(Intervals.ETERNITY))
+ .setVirtualColumns(
+ expressionVirtualColumn(
+ "v0",
+ "timestamp_format(\"d0\",'yyyy-MM','UTC')",
+ ValueType.STRING
+ )
+ )
+ .setGranularity(Granularities.ALL)
+ .addDimension(new DefaultDimensionSpec("v0", "_d0"))
+ .addAggregator(new LongSumAggregatorFactory("_a0",
"a0"))
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"2000-01", 3L},
+ new Object[]{"2001-01", 3L}
+ )
+ );
+ }
+
+ @Test
public void testJoinOuterGroupByAndSubqueryHasLimit() throws Exception
{
// Cannot vectorize JOIN operator.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]