This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 63c1746  Fix timeseries query constructor when postAggregator has an 
expression reading timestamp result column (#10198)
63c1746 is described below

commit 63c1746fe40fc40d60dbc77f411e58949c102bfa
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 27 10:54:44 2020 -0700

    Fix timeseries query constructor when postAggregator has an expression 
reading timestamp result column (#10198)
    
    * Fix timeseries query constructor when postAggregator has an expression 
reading timestamp result column
    
    * fix npe
    
    * Fix postAgg referencing timestampResultField and add a test for it
    
    * fix test
    
    * doc
    
    * revert doc
---
 .../druid/query/timeseries/TimeseriesQuery.java    |  5 +-
 .../timeseries/TimeseriesQueryQueryToolChest.java  | 16 ++--
 .../groupby/GroupByTimeseriesQueryRunnerTest.java  | 55 +++++++++++--
 .../timeseries/TimeseriesQueryRunnerTest.java      | 89 ++++++++++++++++++----
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 46 +++++++++++
 5 files changed, 183 insertions(+), 28 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
index 6603ee0..4756707 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
@@ -78,11 +78,14 @@ public class TimeseriesQuery extends 
BaseQuery<Result<TimeseriesResultValue>>
   {
     super(dataSource, querySegmentSpec, descending, context, granularity);
 
+    // The below should be executed after context is initialized.
+    final String timestampField = getTimestampResultField();
+
     this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
     this.dimFilter = dimFilter;
     this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : 
aggregatorSpecs;
     this.postAggregatorSpecs = Queries.prepareAggregations(
-        ImmutableList.of(),
+        timestampField == null ? ImmutableList.of() : 
ImmutableList.of(timestampField),
         this.aggregatorSpecs,
         postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs
     );
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 9b4c94b..d811914 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -412,7 +412,6 @@ public class TimeseriesQueryQueryToolChest extends 
QueryToolChest<Result<Timeser
   @Override
   public RowSignature resultArraySignature(TimeseriesQuery query)
   {
-
     RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
     rowSignatureBuilder.addTimeColumn();
     if (StringUtils.isNotEmpty(query.getTimestampResultField())) {
@@ -460,6 +459,14 @@ public class TimeseriesQueryQueryToolChest extends 
QueryToolChest<Result<Timeser
       final TimeseriesResultValue holder = result.getValue();
       final Map<String, Object> values = new HashMap<>(holder.getBaseObject());
       if (calculatePostAggs) {
+        // If "timestampResultField" is set, we must include a copy of the 
timestamp in the result.
+        // This is used by the SQL layer when it generates a Timeseries query 
for a group-by-time-floor SQL query.
+        // The SQL layer expects the result of the time-floor to have a 
specific name that is not going to be "__time".
+        // This should be done before computing post aggregators since they 
can reference "timestampResultField".
+        if (StringUtils.isNotEmpty(query.getTimestampResultField()) && 
result.getTimestamp() != null) {
+          final DateTime timestamp = result.getTimestamp();
+          values.put(query.getTimestampResultField(), timestamp.getMillis());
+        }
         if (!query.getPostAggregatorSpecs().isEmpty()) {
           // put non finalized aggregators for calculating dependent post 
Aggregators
           for (AggregatorFactory agg : query.getAggregatorSpecs()) {
@@ -469,13 +476,6 @@ public class TimeseriesQueryQueryToolChest extends 
QueryToolChest<Result<Timeser
             values.put(postAgg.getName(), postAgg.compute(values));
           }
         }
-        // If "timestampResultField" is set, we must include a copy of the 
timestamp in the result.
-        // This is used by the SQL layer when it generates a Timeseries query 
for a group-by-time-floor SQL query.
-        // The SQL layer expects the result of the time-floor to have a 
specific name that is not going to be "__time".
-        if (StringUtils.isNotEmpty(query.getTimestampResultField()) && 
result.getTimestamp() != null) {
-          final DateTime timestamp = result.getTimestamp();
-          values.put(query.getTimestampResultField(), timestamp.getMillis());
-        }
       }
       for (AggregatorFactory agg : query.getAggregatorSpecs()) {
         values.put(agg.getName(), fn.manipulate(agg, 
holder.getMetric(agg.getName())));
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
index 9e7b45a..a189e6a 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.io.Closer;
@@ -40,9 +41,15 @@ import org.apache.druid.query.Result;
 import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
 import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerTest;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.joda.time.DateTime;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -52,10 +59,11 @@ import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /**
- *
+ * This class is for testing both timeseries and groupBy queries with the same 
set of queries.
  */
 @RunWith(Parameterized.class)
 public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
@@ -99,15 +107,36 @@ public class GroupByTimeseriesQueryRunnerTest extends 
TimeseriesQueryRunnerTest
               toolChest
           );
 
+          final String timeDimension = tsQuery.getTimestampResultField();
+          final List<VirtualColumn> virtualColumns = new ArrayList<>(
+              Arrays.asList(tsQuery.getVirtualColumns().getVirtualColumns())
+          );
+          if (timeDimension != null) {
+            final PeriodGranularity granularity = (PeriodGranularity) 
tsQuery.getGranularity();
+            virtualColumns.add(
+                new ExpressionVirtualColumn(
+                    "v0",
+                    StringUtils.format("timestamp_floor(__time, '%s')", 
granularity.getPeriod()),
+                    ValueType.LONG,
+                    TestExprMacroTable.INSTANCE
+                )
+            );
+          }
+
           GroupByQuery newQuery = GroupByQuery
               .builder()
               .setDataSource(tsQuery.getDataSource())
               .setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
               .setGranularity(tsQuery.getGranularity())
               .setDimFilter(tsQuery.getDimensionsFilter())
+              .setDimensions(
+                  timeDimension == null
+                  ? ImmutableList.of()
+                  : ImmutableList.of(new DefaultDimensionSpec("v0", 
timeDimension, ValueType.LONG))
+              )
               .setAggregatorSpecs(tsQuery.getAggregatorSpecs())
               .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
-              .setVirtualColumns(tsQuery.getVirtualColumns())
+              .setVirtualColumns(VirtualColumns.create(virtualColumns))
               .setContext(tsQuery.getContext())
               .build();
 
@@ -239,14 +268,28 @@ public class GroupByTimeseriesQueryRunnerTest extends 
TimeseriesQueryRunnerTest
   @Override
   public void testTimeseriesWithTimestampResultFieldContextForArrayResponse()
   {
-    // Skip this test because the timeseries test expects an extra column to 
be created (map from the timestamp_floor
-    // of the timestamp dimension) but group by doesn't do this.
+    // Cannot vectorize with an expression virtual column
+    if (!vectorize) {
+      super.testTimeseriesWithTimestampResultFieldContextForArrayResponse();
+    }
   }
 
   @Override
   public void testTimeseriesWithTimestampResultFieldContextForMapResponse()
   {
-    // Skip this test because the timeseries test expects an extra column to 
be created (map from the timestamp_floor
-    // of the timestamp dimension) but group by doesn't do this.
+    // Cannot vectorize with an expression virtual column
+    if (!vectorize) {
+      super.testTimeseriesWithTimestampResultFieldContextForMapResponse();
+    }
+  }
+
+  @Override
+  @Test
+  public void testTimeseriesWithPostAggregatorReferencingTimestampResultField()
+  {
+    // Cannot vectorize with an expression virtual column
+    if (!vectorize) {
+      super.testTimeseriesWithPostAggregatorReferencingTimestampResultField();
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
index 99989c0..4579028 100644
--- 
a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
@@ -47,6 +47,7 @@ import 
org.apache.druid.query.aggregation.FilteredAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.extraction.MapLookupExtractor;
 import org.apache.druid.query.filter.AndDimFilter;
@@ -1619,7 +1620,6 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
                                   
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
                                   .aggregators(aggregatorFactoryList)
                                   
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
-                                  .context(ImmutableMap.of("skipEmptyBuckets", 
"true"))
                                   .descending(descending)
                                   
.context(makeContext(ImmutableMap.of("skipEmptyBuckets", "true")))
                                   .build();
@@ -2489,9 +2489,14 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
                                   )
                                   
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
                                   .descending(descending)
-                                  .context(ImmutableMap.of(
-                                      
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME
-                                  ))
+                                  .context(
+                                      makeContext(
+                                          ImmutableMap.of(
+                                              
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME,
+                                              "skipEmptyBuckets", true
+                                          )
+                                      )
+                                  )
                                   .build();
 
     Assert.assertEquals(TIMESTAMP_RESULT_FIELD_NAME, 
query.getTimestampResultField());
@@ -2518,6 +2523,9 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
     final String[] expectedIndex = descending ?
                                    
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC :
                                    
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES;
+    final String[] expectedIndexToUse = Arrays.stream(expectedIndex)
+                                              .filter(eachIndex -> 
!"0.0".equals(eachIndex))
+                                              .toArray(String[]::new);
 
     final Long expectedLast = descending ?
                                   QueryRunnerTestHelper.EARLIEST.getMillis() :
@@ -2545,7 +2553,7 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
 
       if (QueryRunnerTestHelper.SKIPPED_DAY.getMillis() != current) {
         Assert.assertEquals(
-            Doubles.tryParse(expectedIndex[count]).doubleValue(),
+            Doubles.tryParse(expectedIndexToUse[count]).doubleValue(),
             (Double) result[3],
             (Double) result[3] * 1e-6
         );
@@ -2555,7 +2563,7 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
             0.02
         );
         Assert.assertEquals(
-            new Double(expectedIndex[count]) + 13L + 1L,
+            new Double(expectedIndexToUse[count]) + 13L + 1L,
             (Double) result[5],
             (Double) result[5] * 1e-6
         );
@@ -2572,7 +2580,7 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
               0.02
           );
           Assert.assertEquals(
-              new Double(expectedIndex[count]) + 1L,
+              new Double(expectedIndexToUse[count]) + 1L,
               (Double) result[5],
               (Double) result[5] * 1e-6
           );
@@ -2612,9 +2620,14 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
                                   )
                                   
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
                                   .descending(descending)
-                                  .context(ImmutableMap.of(
-                                      
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME
-                                  ))
+                                  .context(
+                                      makeContext(
+                                          ImmutableMap.of(
+                                              
TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, TIMESTAMP_RESULT_FIELD_NAME,
+                                              "skipEmptyBuckets", true
+                                          )
+                                      )
+                                  )
                                   .build();
 
     Assert.assertEquals(TIMESTAMP_RESULT_FIELD_NAME, 
query.getTimestampResultField());
@@ -2624,6 +2637,9 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
     final String[] expectedIndex = descending ?
                                    
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC :
                                    
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES;
+    final String[] expectedIndexToUse = Arrays.stream(expectedIndex)
+                                              .filter(eachIndex -> 
!"0.0".equals(eachIndex))
+                                              .toArray(String[]::new);
 
     final DateTime expectedLast = descending ?
                                   QueryRunnerTestHelper.EARLIEST :
@@ -2655,13 +2671,13 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
       if (!QueryRunnerTestHelper.SKIPPED_DAY.equals(current)) {
         Assert.assertEquals(
             result.toString(),
-            Doubles.tryParse(expectedIndex[count]).doubleValue(),
+            Doubles.tryParse(expectedIndexToUse[count]).doubleValue(),
             value.getDoubleMetric("index").doubleValue(),
             value.getDoubleMetric("index").doubleValue() * 1e-6
         );
         Assert.assertEquals(
             result.toString(),
-            new Double(expectedIndex[count]) +
+            new Double(expectedIndexToUse[count]) +
             13L + 1L,
             value.getDoubleMetric("addRowsIndexConstant"),
             value.getDoubleMetric("addRowsIndexConstant") * 1e-6
@@ -2681,7 +2697,7 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
           );
           Assert.assertEquals(
               result.toString(),
-              new Double(expectedIndex[count]) + 1L,
+              new Double(expectedIndexToUse[count]) + 1L,
               value.getDoubleMetric("addRowsIndexConstant"),
               value.getDoubleMetric("addRowsIndexConstant") * 1e-6
           );
@@ -2811,6 +2827,53 @@ public class TimeseriesQueryRunnerTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(10, list.size());
   }
 
+  @Test
+  public void testTimeseriesWithPostAggregatorReferencingTimestampResultField()
+  {
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+                                  .granularity(QueryRunnerTestHelper.DAY_GRAN)
+                                  
.filters(QueryRunnerTestHelper.MARKET_DIMENSION, "spot")
+                                  
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+                                  .postAggregators(
+                                      new 
FieldAccessPostAggregator("timestampInPostAgg", "myTimestamp")
+                                  )
+                                  .descending(descending)
+                                  .context(
+                                      makeContext(
+                                          
ImmutableMap.of(TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, "myTimestamp")
+                                      )
+                                  )
+                                  .build();
+
+    final DateTime aprilFirst = DateTimes.of("2011-04-01");
+    final DateTime aprilSecond = DateTimes.of("2011-04-02");
+
+    List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            aprilFirst,
+            new TimeseriesResultValue(
+                ImmutableMap.of(
+                    "myTimestamp", aprilFirst.getMillis(),
+                    "timestampInPostAgg", aprilFirst.getMillis()
+                )
+            )
+        ),
+        new Result<>(
+            aprilSecond,
+            new TimeseriesResultValue(
+                ImmutableMap.of(
+                    "myTimestamp", aprilSecond.getMillis(),
+                    "timestampInPostAgg", aprilSecond.getMillis()
+                )
+            )
+        )
+    );
+
+    Iterable<Result<TimeseriesResultValue>> results = 
runner.run(QueryPlus.wrap(query)).toList();
+    assertExpectedResults(expectedResults, results);
+  }
+
   private Map<String, Object> makeContext()
   {
     return makeContext(ImmutableMap.of());
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index deffe20..e8fda2a 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -204,6 +204,52 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
   }
 
   @Test
+  public void 
testGroupByWithPostAggregatorReferencingTimeFloorColumnOnTimeseries() throws 
Exception
+  {
+    cannotVectorize();
+
+    testQuery(
+        "SELECT TIME_FORMAT(\"date\", 'yyyy-MM'), SUM(x)\n"
+        + "FROM (\n"
+        + "    SELECT\n"
+        + "        FLOOR(__time to hour) as \"date\",\n"
+        + "        COUNT(*) as x\n"
+        + "    FROM foo\n"
+        + "    GROUP BY 1\n"
+        + ")\n"
+        + "GROUP BY 1",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            Druids.newTimeseriesQueryBuilder()
+                                  .dataSource(CalciteTests.DATASOURCE1)
+                                  
.intervals(querySegmentSpec(Filtration.eternity()))
+                                  .granularity(Granularities.HOUR)
+                                  .aggregators(aggregators(new 
CountAggregatorFactory("a0")))
+                                  
.context(getTimeseriesContextWithFloorTime(TIMESERIES_CONTEXT_DEFAULT, "d0"))
+                                  .build()
+                        )
+                        .setInterval(querySegmentSpec(Intervals.ETERNITY))
+                        .setVirtualColumns(
+                            expressionVirtualColumn(
+                                "v0",
+                                "timestamp_format(\"d0\",'yyyy-MM','UTC')",
+                                ValueType.STRING
+                            )
+                        )
+                        .setGranularity(Granularities.ALL)
+                        .addDimension(new DefaultDimensionSpec("v0", "_d0"))
+                        .addAggregator(new LongSumAggregatorFactory("_a0", 
"a0"))
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"2000-01", 3L},
+            new Object[]{"2001-01", 3L}
+        )
+    );
+  }
+
+  @Test
   public void testJoinOuterGroupByAndSubqueryHasLimit() throws Exception
   {
     // Cannot vectorize JOIN operator.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to