This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aca61763e SQL: Use timestamp_floor when granularity is not safe. 
(#13206)
6aca61763e is described below

commit 6aca61763ef16f232c0059c4eaade0e59acf774a
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Oct 17 08:22:45 2022 -0700

    SQL: Use timestamp_floor when granularity is not safe. (#13206)
    
    * SQL: Use timestamp_floor when granularity is not safe.
    
    PR #12944 added a check at the execution layer to avoid materializing
    excessive amounts of time-granular buckets. This patch modifies the SQL
    planner to avoid generating queries that would throw such errors, by
    switching certain plans to use the timestamp_floor function instead of
    granularities. This applies both to the Timeseries query type, and the
    GroupBy timestampResultFieldGranularity feature.
    
    The patch also goes one step further: we switch to timestamp_floor
    not just in the ETERNITY + non-ALL case, but also if the estimated
    number of time-granular buckets exceeds 100,000.
    
    Finally, the patch modifies the timestampResultFieldGranularity
    field to consistently be a String rather than a Granularity. This
    ensures that it can be round-trip serialized and deserialized, which is
    useful when trying to execute the results of "EXPLAIN PLAN FOR" with
    GroupBy queries that use the timestampResultFieldGranularity feature.
    
    * Fix test, address PR comments.
    
    * Fix ControllerImpl.
    
    * Fix test.
    
    * Fix unused import.
---
 .../benchmark/GroupByTypeInterfaceBenchmark.java   |   2 +
 .../query/CachingClusteredClientBenchmark.java     |   1 +
 .../druid/benchmark/query/GroupByBenchmark.java    |   2 +
 .../druid/segment/MapVirtualColumnGroupByTest.java |   1 +
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  39 ++++--
 .../java/org/apache/druid/query/QueryContext.java  |  19 +--
 .../query/groupby/strategy/GroupByStrategyV2.java  |   6 +-
 .../druid/segment/RowBasedStorageAdapter.java      |  23 +++-
 ...GroupByLimitPushDownInsufficientBufferTest.java |   3 +
 .../GroupByLimitPushDownMultiNodeMergeTest.java    |   3 +
 .../query/groupby/GroupByMultiSegmentTest.java     |   2 +
 .../query/groupby/GroupByQueryMergeBufferTest.java |   2 +
 .../groupby/GroupByQueryRunnerFailureTest.java     |   2 +
 .../query/groupby/GroupByQueryRunnerTest.java      |   1 +
 .../groupby/GroupByTimeseriesQueryRunnerTest.java  |  14 +-
 .../query/groupby/NestedQueryPushDownTest.java     |   3 +
 .../apache/druid/sql/calcite/rel/DruidQuery.java   |  76 ++++++++++-
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |   9 +-
 .../sql/calcite/CalciteCorrelatedQueryTest.java    |   7 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 146 +++++++++++++++++----
 20 files changed, 300 insertions(+), 61 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 8e885856ba..cd6c48a16e 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -66,6 +66,7 @@ import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
@@ -401,6 +402,7 @@ public class GroupByTypeInterfaceBenchmark
             configSupplier,
             bufferPool,
             mergePool,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             QueryBenchmarkUtil.NOOP_QUERYWATCHER
         )
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index bc480c1063..4b0d55c2c6 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -378,6 +378,7 @@ public class CachingClusteredClientBenchmark
             bufferPool,
             mergeBufferPool,
             mapper,
+            mapper,
             QueryRunnerTestHelper.NOOP_QUERYWATCHER
         )
     );
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 9dd0ea5c5e..602126ba4a 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -81,6 +81,7 @@ import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.generator.DataGenerator;
@@ -516,6 +517,7 @@ public class GroupByBenchmark
             configSupplier,
             bufferPool,
             mergePool,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             QueryBenchmarkUtil.NOOP_QUERYWATCHER
         )
diff --git 
a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
 
b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
index 3e0cda4aed..cd746aaff7 100644
--- 
a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
+++ 
b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
@@ -101,6 +101,7 @@ public class MapVirtualColumnGroupByTest extends 
InitializedNullHandlingTest
             GroupByQueryConfig::new,
             new StupidPool<>("map-virtual-column-groupby-test", () -> 
ByteBuffer.allocate(1024)),
             new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1),
+            TestHelper.makeJsonMapper(),
             new DefaultObjectMapper(),
             QueryRunnerTestHelper.NOOP_QUERYWATCHER
         )
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b5bb6bdd2b..cf5a8902a1 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.exec;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -523,7 +524,8 @@ public class ControllerImpl implements Controller
     final QueryDefinition queryDef = makeQueryDefinition(
         id(),
         makeQueryControllerToolKit(),
-        task.getQuerySpec()
+        task.getQuerySpec(),
+        context.jsonMapper()
     );
 
     QueryValidator.validateQueryDef(queryDef);
@@ -1311,7 +1313,8 @@ public class ControllerImpl implements Controller
   private static QueryDefinition makeQueryDefinition(
       final String queryId,
       @SuppressWarnings("rawtypes") final QueryKit toolKit,
-      final MSQSpec querySpec
+      final MSQSpec querySpec,
+      final ObjectMapper jsonMapper
   )
   {
     final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
@@ -1395,7 +1398,9 @@ public class ControllerImpl implements Controller
       }
 
       // Then, add a segment-generation stage.
-      final DataSchema dataSchema = generateDataSchema(querySpec, 
querySignature, queryClusterBy, columnMappings);
+      final DataSchema dataSchema =
+          generateDataSchema(querySpec, querySignature, queryClusterBy, 
columnMappings, jsonMapper);
+
       builder.add(
           StageDefinition.builder(queryDef.getNextStageNumber())
                          .inputs(new 
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
@@ -1421,7 +1426,8 @@ public class ControllerImpl implements Controller
       MSQSpec querySpec,
       RowSignature querySignature,
       ClusterBy queryClusterBy,
-      ColumnMappings columnMappings
+      ColumnMappings columnMappings,
+      ObjectMapper jsonMapper
   )
   {
     final DataSourceMSQDestination destination = (DataSourceMSQDestination) 
querySpec.getDestination();
@@ -1442,7 +1448,7 @@ public class ControllerImpl implements Controller
         new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
         new DimensionsSpec(dimensionsAndAggregators.lhs),
         dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]),
-        makeGranularitySpecForIngestion(querySpec.getQuery(), 
querySpec.getColumnMappings(), isRollupQuery),
+        makeGranularitySpecForIngestion(querySpec.getQuery(), 
querySpec.getColumnMappings(), isRollupQuery, jsonMapper),
         new TransformSpec(null, Collections.emptyList())
     );
   }
@@ -1450,18 +1456,25 @@ public class ControllerImpl implements Controller
   private static GranularitySpec makeGranularitySpecForIngestion(
       final Query<?> query,
       final ColumnMappings columnMappings,
-      final boolean isRollupQuery
+      final boolean isRollupQuery,
+      final ObjectMapper jsonMapper
   )
   {
     if (isRollupQuery) {
-      final String queryGranularity = 
query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, 
"");
+      final String queryGranularityString =
+          
query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, 
"");
 
-      if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && 
!queryGranularity.isEmpty()) {
-        return new ArbitraryGranularitySpec(
-            Granularity.fromString(queryGranularity),
-            true,
-            Intervals.ONLY_ETERNITY
-        );
+      if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && 
!queryGranularityString.isEmpty()) {
+        final Granularity queryGranularity;
+
+        try {
+          queryGranularity = jsonMapper.readValue(queryGranularityString, 
Granularity.class);
+        }
+        catch (JsonProcessingException e) {
+          throw new RuntimeException(e);
+        }
+
+        return new ArbitraryGranularitySpec(queryGranularity, true, 
Intervals.ONLY_ETERNITY);
       }
       return new ArbitraryGranularitySpec(Granularities.NONE, true, 
Intervals.ONLY_ETERNITY);
     } else {
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java 
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 0ed8e18466..624bc1cb3d 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
@@ -26,7 +27,7 @@ import org.apache.druid.query.QueryContexts.Vectorize;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
 
 import javax.annotation.Nullable;
-
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
@@ -231,16 +232,18 @@ public class QueryContext
     return QueryContexts.getAsEnum(key, get(key), clazz, defaultValue);
   }
 
-  public Granularity getGranularity(String key)
+  public Granularity getGranularity(String key, ObjectMapper jsonMapper)
   {
-    final Object value = get(key);
-    if (value == null) {
+    final String granularityString = getString(key);
+    if (granularityString == null) {
       return null;
     }
-    if (value instanceof Granularity) {
-      return (Granularity) value;
-    } else {
-      throw QueryContexts.badTypeException(key, "a Granularity", value);
+
+    try {
+      return jsonMapper.readValue(granularityString, Granularity.class);
+    }
+    catch (IOException e) {
+      throw QueryContexts.badTypeException(key, "a Granularity", 
granularityString);
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index 0ee5078efd..cda33f459e 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -31,6 +31,7 @@ import org.apache.druid.collections.BlockingPool;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.guice.annotations.Global;
+import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Merging;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.Intervals;
@@ -98,6 +99,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
   private final Supplier<GroupByQueryConfig> configSupplier;
   private final NonBlockingPool<ByteBuffer> bufferPool;
   private final BlockingPool<ByteBuffer> mergeBufferPool;
+  private final ObjectMapper jsonMapper;
   private final ObjectMapper spillMapper;
   private final QueryWatcher queryWatcher;
 
@@ -107,6 +109,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
       Supplier<GroupByQueryConfig> configSupplier,
       @Global NonBlockingPool<ByteBuffer> bufferPool,
       @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+      @Json ObjectMapper jsonMapper,
       @Smile ObjectMapper spillMapper,
       QueryWatcher queryWatcher
   )
@@ -115,6 +118,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
     this.configSupplier = configSupplier;
     this.bufferPool = bufferPool;
     this.mergeBufferPool = mergeBufferPool;
+    this.jsonMapper = jsonMapper;
     this.spillMapper = spillMapper;
     this.queryWatcher = queryWatcher;
   }
@@ -252,7 +256,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
       // the granularity and dimensions are slightly different.
       // now, part of the query plan logic is handled in GroupByStrategyV2, 
not only in DruidQuery.toGroupByQuery()
       final Granularity timestampResultFieldGranularity
-          = 
queryContext.getGranularity(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
+          = 
queryContext.getGranularity(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
 jsonMapper);
       dimensionSpecs =
           query.getDimensions()
                .stream()
diff --git 
a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java 
b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java
index 3fed0fc1cd..b537e9d69d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java
@@ -66,6 +66,17 @@ public class RowBasedStorageAdapter<RowType> implements 
StorageAdapter
     this.rowSignature = Preconditions.checkNotNull(rowSignature, 
"rowSignature");
   }
 
+  /**
+   * Whether the provided time interval and granularity combination is allowed.
+   *
+   * We restrict ETERNITY with non-ALL granularity, because allowing it would 
involve creating a very high number
+   * of time grains. This would cause queries to take an excessive amount of 
time or run out of memory.
+   */
+  public static boolean isQueryGranularityAllowed(final Interval interval, 
final Granularity granularity)
+  {
+    return Granularities.ALL.equals(granularity) || 
!Intervals.ETERNITY.equals(interval);
+  }
+
   @Override
   public Interval getInterval()
   {
@@ -172,11 +183,13 @@ public class RowBasedStorageAdapter<RowType> implements 
StorageAdapter
     if (actualInterval == null) {
       return Sequences.empty();
     }
-    // Incase time interval is ETERNITY with non-ALL granularity, don't risk 
creating time grains.
-    // For all non-ALL granularities, the time grains will be very high in 
number and that can either OOM the heap
-    // or create an very very long query.
-    if (actualInterval.contains(Intervals.ETERNITY) && 
!gran.equals(Granularities.ALL)) {
-      throw new IAE("Cannot support ETERNITY interval with %s time 
granluarity", gran);
+
+    if (!isQueryGranularityAllowed(actualInterval, gran)) {
+      throw new IAE(
+          "Cannot support interval [%s] with granularity [%s]",
+          Intervals.ETERNITY.equals(actualInterval) ? "ETERNITY" : 
actualInterval,
+          gran
+      );
     }
 
     final RowWalker<RowType> rowWalker = new RowWalker<>(
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index 0dfdac7fb4..88f3ea260b 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -352,6 +353,7 @@ public class GroupByLimitPushDownInsufficientBufferTest 
extends InitializedNullH
             configSupplier,
             bufferPool,
             mergePool,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             NOOP_QUERYWATCHER
         )
@@ -369,6 +371,7 @@ public class GroupByLimitPushDownInsufficientBufferTest 
extends InitializedNullH
             configSupplier,
             bufferPool,
             tooSmallMergePool,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             NOOP_QUERYWATCHER
         )
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 73685d7aa5..25b76e9f52 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -74,6 +74,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
@@ -601,6 +602,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
             configSupplier,
             bufferPool,
             mergePool,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             NOOP_QUERYWATCHER
         )
@@ -618,6 +620,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
             configSupplier,
             bufferPool,
             mergePool2,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             NOOP_QUERYWATCHER
         )
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index b18b4d9252..ef68abbbbb 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -266,6 +267,7 @@ public class GroupByMultiSegmentTest
             configSupplier,
             bufferPool,
             mergePool,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             NOOP_QUERYWATCHER
         )
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
index 00d638af69..e31249d5b1 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
 import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
 import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -139,6 +140,7 @@ public class GroupByQueryMergeBufferTest extends 
InitializedNullHandlingTest
             configSupplier,
             BUFFER_POOL,
             MERGE_BUFFER_POOL,
+            TestHelper.makeJsonMapper(),
             mapper,
             QueryRunnerTestHelper.NOOP_QUERYWATCHER
         )
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
index 9ba93ef63d..acfc31892f 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
 import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
 import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.apache.druid.segment.TestHelper;
 import org.junit.AfterClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -110,6 +111,7 @@ public class GroupByQueryRunnerFailureTest
             configSupplier,
             BUFFER_POOL,
             MERGE_BUFFER_POOL,
+            TestHelper.makeJsonMapper(),
             mapper,
             QueryRunnerTestHelper.NOOP_QUERYWATCHER
         )
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index db1c689e84..f9060db4c9 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -412,6 +412,7 @@ public class GroupByQueryRunnerTest extends 
InitializedNullHandlingTest
             configSupplier,
             bufferPools.getProcessingPool(),
             bufferPools.getMergePool(),
+            TestHelper.makeJsonMapper(),
             mapper,
             QueryRunnerTestHelper.NOOP_QUERYWATCHER
         )
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
index 0877cccbb4..8204f13914 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.query.groupby;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -47,6 +49,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryRunnerTest;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
@@ -104,6 +107,7 @@ public class GroupByTimeseriesQueryRunnerTest extends 
TimeseriesQueryRunnerTest
         @Override
         public Sequence run(QueryPlus queryPlus, ResponseContext 
responseContext)
         {
+          final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
           TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery();
           QueryRunner<ResultRow> newRunner = factory.mergeRunners(
               Execs.directExecutor(), ImmutableList.of(runner)
@@ -133,7 +137,15 @@ public class GroupByTimeseriesQueryRunnerTest extends 
TimeseriesQueryRunnerTest
             );
 
             theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, 
timeDimension);
-            
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, 
granularity);
+            try {
+              theContext.put(
+                  GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
+                  jsonMapper.writeValueAsString(granularity)
+              );
+            }
+            catch (JsonProcessingException e) {
+              throw new RuntimeException(e);
+            }
             theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 0);
           }
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index 8592fca11e..f968055063 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -73,6 +73,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
@@ -311,6 +312,7 @@ public class NestedQueryPushDownTest extends 
InitializedNullHandlingTest
             configSupplier,
             bufferPool,
             mergePool,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             NOOP_QUERYWATCHER
         )
@@ -328,6 +330,7 @@ public class NestedQueryPushDownTest extends 
InitializedNullHandlingTest
             configSupplier,
             bufferPool,
             mergePool2,
+            TestHelper.makeJsonMapper(),
             new ObjectMapper(new SmileFactory()),
             NOOP_QUERYWATCHER
         )
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 44b0786cb6..14197005d7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -63,6 +63,7 @@ import 
org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
 import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.ordering.StringComparator;
 import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
@@ -71,6 +72,7 @@ import org.apache.druid.query.topn.InvertedTopNMetricSpec;
 import org.apache.druid.query.topn.NumericTopNMetricSpec;
 import org.apache.druid.query.topn.TopNMetricSpec;
 import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.RowBasedStorageAdapter;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnCapabilities;
@@ -90,6 +92,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.rule.GroupByRules;
 import org.apache.druid.sql.calcite.run.EngineFeature;
 import org.apache.druid.sql.calcite.table.RowSignatures;
+import org.joda.time.Interval;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -118,6 +121,13 @@ public class DruidQuery
    */
   public static final String CTX_SCAN_SIGNATURE = "scanSignature";
 
+  /**
+   * Maximum number of time-granular buckets that we allow for non-Druid 
tables.
+   *
+   * Used by {@link #canUseQueryGranularity}.
+   */
+  private static final int MAX_TIME_GRAINS_NON_DRUID_TABLE = 100000;
+
   private final DataSource dataSource;
   private final PlannerContext plannerContext;
 
@@ -774,6 +784,53 @@ public class DruidQuery
     return 
Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
   }
 
+  /**
+   * Whether the provided combination of dataSource, filtration, and 
queryGranularity is safe to use in queries.
+   *
+   * Necessary because some combinations are unsafe, mainly because they would 
lead to the creation of too many
+   * time-granular buckets during query processing.
+   */
+  private static boolean canUseQueryGranularity(
+      final DataSource dataSource,
+      final Filtration filtration,
+      final Granularity queryGranularity
+  )
+  {
+    if (Granularities.ALL.equals(queryGranularity)) {
+      // Always OK: no storage adapter has problem with ALL.
+      return true;
+    }
+
+    if (DataSourceAnalysis.forDataSource(dataSource).isConcreteTableBased()) {
+      // Always OK: queries on concrete tables (regular Druid datasources) use 
segment-based storage adapters
+      // (IncrementalIndex or QueryableIndex). These clip query interval to 
data interval, making wide query
+      // intervals safer. They do not have special checks for granularity and 
interval safety.
+      return true;
+    }
+
+    // Query is against something other than a regular Druid table. Apply 
additional checks, because we can't
+    // count on interval-clipping to save us.
+
+    for (final Interval filtrationInterval : filtration.getIntervals()) {
+      // Query may be using RowBasedStorageAdapter. We don't know for sure, so 
check
+      // RowBasedStorageAdapter#isQueryGranularityAllowed to be safe.
+      if 
(!RowBasedStorageAdapter.isQueryGranularityAllowed(filtrationInterval, 
queryGranularity)) {
+        return false;
+      }
+
+      // Validate the interval against MAX_TIME_GRAINS_NON_DRUID_TABLE.
+      // Estimate based on the size of the first bucket, to avoid computing 
them all. (That's what we're
+      // trying to avoid!)
+      final Interval firstBucket = 
queryGranularity.bucket(filtrationInterval.getStart());
+      final long estimatedNumBuckets = filtrationInterval.toDurationMillis() / 
firstBucket.toDurationMillis();
+      if (estimatedNumBuckets > MAX_TIME_GRAINS_NON_DRUID_TABLE) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
   public DataSource getDataSource()
   {
     return dataSource;
@@ -1004,6 +1061,10 @@ public class DruidQuery
     final DataSource newDataSource = dataSourceFiltrationPair.lhs;
     final Filtration filtration = dataSourceFiltrationPair.rhs;
 
+    if (!canUseQueryGranularity(dataSource, filtration, queryGranularity)) {
+      return null;
+    }
+
     final List<PostAggregator> postAggregators = new 
ArrayList<>(grouping.getPostAggregators());
     if (sorting != null && sorting.getProjection() != null) {
       postAggregators.addAll(sorting.getProjection().getPostAggregators());
@@ -1208,7 +1269,8 @@ public class DruidQuery
             dimensionExpression.getDruidExpression(),
             plannerContext.getExprMacroTable()
         );
-        if (granularity == null) {
+        if (granularity == null || !canUseQueryGranularity(dataSource, 
filtration, granularity)) {
+          // Can't, or won't, convert this dimension to a query granularity.
           continue;
         }
         if (queryGranularity != null) {
@@ -1219,10 +1281,20 @@ public class DruidQuery
         }
         queryGranularity = granularity;
         int timestampDimensionIndexInDimensions = 
grouping.getDimensions().indexOf(dimensionExpression);
+
         // these settings will only affect the most inner query sent to the 
down streaming compute nodes
         theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, 
dimensionExpression.getOutputName());
         theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 
timestampDimensionIndexInDimensions);
-        theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, 
queryGranularity);
+
+        try {
+          theContext.put(
+              GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
+              
plannerContext.getJsonMapper().writeValueAsString(queryGranularity)
+          );
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
       }
     }
     if (queryGranularity == null) {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index 617e5acd58..020166d110 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -1275,7 +1275,14 @@ public class BaseCalciteQueryTest extends CalciteTestBase
   {
     Map<String, Object> output = new HashMap<>(input);
     output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField);
-    output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, 
granularity);
+
+    try {
+      output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, 
queryJsonMapper.writeValueAsString(granularity));
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+
     output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 
timestampResultFieldIndex);
     return output;
   }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
index b019ad090b..3a1ac2db9a 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
@@ -49,7 +49,6 @@ import org.junit.runner.RunWith;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 @RunWith(JUnitParamsRunner.class)
@@ -546,10 +545,6 @@ public class CalciteCorrelatedQueryTest extends 
BaseCalciteQueryTest
       Granularity granularity
   )
   {
-    Map<String, Object> output = new HashMap<>(input);
-    output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField);
-    output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, 
granularity);
-    output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 0);
-    return output;
+    return withTimestampResultContext(input, timestampResultField, 0, 
granularity);
   }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 0e0579c1a2..87338a40e6 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -28,7 +28,6 @@ import org.apache.calcite.runtime.CalciteContextException;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.HumanReadableBytes;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.StringUtils;
@@ -6948,6 +6947,87 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @Test
+  public void testUseTimeFloorInsteadOfGranularityOnJoinResult()
+  {
+    cannotVectorize();
+
+    testQuery(
+        "WITH main AS (SELECT * FROM foo LIMIT 2)\n"
+        + "SELECT TIME_FLOOR(__time, 'PT1H') AS \"time\", dim1, COUNT(*)\n"
+        + "FROM main\n"
+        + "WHERE dim1 IN (SELECT dim1 FROM main GROUP BY 1 ORDER BY COUNT(*) 
DESC LIMIT 5)\n"
+        + "GROUP BY 1, 2",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            join(
+                                new QueryDataSource(
+                                    newScanQueryBuilder()
+                                        .dataSource(CalciteTests.DATASOURCE1)
+                                        
.intervals(querySegmentSpec(Intervals.ETERNITY))
+                                        .columns("__time", "dim1")
+                                        .limit(2)
+                                        .build()
+                                ),
+                                new QueryDataSource(
+                                    GroupByQuery.builder()
+                                                .setDataSource(
+                                                    new QueryDataSource(
+                                                        newScanQueryBuilder()
+                                                            
.dataSource(CalciteTests.DATASOURCE1)
+                                                            
.intervals(querySegmentSpec(Intervals.ETERNITY))
+                                                            .columns("dim1")
+                                                            .limit(2)
+                                                            .build()
+                                                    )
+                                                )
+                                                
.setInterval(querySegmentSpec(Intervals.ETERNITY))
+                                                
.setGranularity(Granularities.ALL)
+                                                .setDimensions(dimensions(new 
DefaultDimensionSpec("dim1", "d0")))
+                                                
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+                                                .setLimitSpec(
+                                                    new DefaultLimitSpec(
+                                                        ImmutableList.of(
+                                                            new 
OrderByColumnSpec(
+                                                                "a0",
+                                                                
Direction.DESCENDING,
+                                                                
StringComparators.NUMERIC
+                                                            )
+                                                        ),
+                                                        5
+                                                    )
+                                                )
+                                                .build()
+                                ),
+                                "j0.",
+                                "(\"dim1\" == \"j0.d0\")",
+                                JoinType.INNER
+                            )
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setGranularity(Granularities.ALL)
+                        .setVirtualColumns(
+                            expressionVirtualColumn(
+                                "v0",
+                                
"timestamp_floor(\"__time\",'PT1H',null,'UTC')",
+                                ColumnType.LONG
+                            )
+                        )
+                        .setDimensions(dimensions(
+                            new DefaultDimensionSpec("v0", "d0", 
ColumnType.LONG),
+                            new DefaultDimensionSpec("dim1", "d1")
+                        ))
+                        .setAggregatorSpecs(aggregators(new 
CountAggregatorFactory("a0")))
+                        .setContext(QUERY_CONTEXT_DEFAULT)
+                        .build()
+        ),
+        NullHandling.sqlCompatible()
+        ? ImmutableList.of(new Object[]{946684800000L, "", 1L}, new 
Object[]{946771200000L, "10.1", 1L})
+        : ImmutableList.of(new Object[]{946771200000L, "10.1", 1L})
+    );
+  }
+
   @Test
   public void testMinMaxAvgDailyCountWithLimit()
   {
@@ -14168,29 +14248,47 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
   {
     // the SQL query contains an always FALSE filter ('bar' = 'baz'), which 
optimizes the query to also remove time
     // filter. the converted query hence contains ETERNITY interval but still 
a MONTH granularity due to the grouping.
-    // Such a query should fail since it will create a huge amount of time 
grains which can lead to OOM or a very very
-    // high query time.
-    Assert.assertThrows(IAE.class, () ->
-        testQuery(
-            "SELECT TIME_FLOOR(__time, 'P1m'), max(m1) from \"foo\"\n"
-            + "WHERE __time > CURRENT_TIMESTAMP - INTERVAL '3' MONTH  AND 
'bar'='baz'\n"
-            + "GROUP BY 1\n"
-            + "ORDER BY 1 DESC",
-            ImmutableList.of(
-                Druids.newTimeseriesQueryBuilder()
-                      .dataSource(
-                          InlineDataSource.fromIterable(
-                              ImmutableList.of(),
-                              RowSignature.builder().addTimeColumn().add("m1", 
ColumnType.STRING).build()
-                          ))
-                      .intervals(ImmutableList.of(Intervals.ETERNITY))
-                      .descending(true)
-                      .granularity(Granularities.MONTH)
-                      .aggregators(new LongMaxAggregatorFactory("a0", "m1"))
-                      .build()
-            ),
-            ImmutableList.of()
-        )
+    // Such a query should plan into a GroupBy query with a timestamp_floor 
function, instead of a timeseries
+    // with granularity MONTH, to avoid excessive materialization of time 
grains.
+    //
+    // See DruidQuery#canUseQueryGranularity for the relevant check.
+
+    cannotVectorize();
+
+    testQuery(
+        "SELECT TIME_FLOOR(__time, 'P1m'), max(m1) from \"foo\"\n"
+        + "WHERE __time > CURRENT_TIMESTAMP - INTERVAL '3' MONTH  AND 
'bar'='baz'\n"
+        + "GROUP BY 1\n"
+        + "ORDER BY 1 DESC",
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(InlineDataSource.fromIterable(
+                            ImmutableList.of(),
+                            RowSignature.builder()
+                                        .addTimeColumn()
+                                        .add("m1", ColumnType.FLOAT)
+                                        .build()
+                        ))
+                        .setInterval(querySegmentSpec(Intervals.ETERNITY))
+                        .setVirtualColumns(expressionVirtualColumn(
+                            "v0",
+                            "timestamp_floor(\"__time\",'P1m',null,'UTC')",
+                            ColumnType.LONG
+                        ))
+                        .setGranularity(Granularities.ALL)
+                        .addDimension(new DefaultDimensionSpec("v0", "d0", 
ColumnType.LONG))
+                        .addAggregator(new FloatMaxAggregatorFactory("a0", 
"m1"))
+                        .setLimitSpec(
+                            new DefaultLimitSpec(
+                                ImmutableList.of(
+                                    new OrderByColumnSpec("d0", 
Direction.DESCENDING, StringComparators.NUMERIC)
+                                ),
+                                null
+                            )
+                        )
+                        .build()
+        ),
+        ImmutableList.of()
     );
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to