This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6aca61763e SQL: Use timestamp_floor when granularity is not safe.
(#13206)
6aca61763e is described below
commit 6aca61763ef16f232c0059c4eaade0e59acf774a
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Oct 17 08:22:45 2022 -0700
SQL: Use timestamp_floor when granularity is not safe. (#13206)
* SQL: Use timestamp_floor when granularity is not safe.
PR #12944 added a check at the execution layer to avoid materializing
excessive amounts of time-granular buckets. This patch modifies the SQL
planner to avoid generating queries that would throw such errors, by
switching certain plans to use the timestamp_floor function instead of
granularities. This applies both to the Timeseries query type, and the
GroupBy timestampResultFieldGranularity feature.
The patch also goes one step further: we switch to timestamp_floor
not just in the ETERNITY + non-ALL case, but also if the estimated
number of time-granular buckets exceeds 100,000.
Finally, the patch modifies the timestampResultFieldGranularity
field to consistently be a String rather than a Granularity. This
ensures that it can be round-trip serialized and deserialized, which is
useful when trying to execute the results of "EXPLAIN PLAN FOR" with
GroupBy queries that use the timestampResultFieldGranularity feature.
* Fix test, address PR comments.
* Fix ControllerImpl.
* Fix test.
* Fix unused import.
---
.../benchmark/GroupByTypeInterfaceBenchmark.java | 2 +
.../query/CachingClusteredClientBenchmark.java | 1 +
.../druid/benchmark/query/GroupByBenchmark.java | 2 +
.../druid/segment/MapVirtualColumnGroupByTest.java | 1 +
.../org/apache/druid/msq/exec/ControllerImpl.java | 39 ++++--
.../java/org/apache/druid/query/QueryContext.java | 19 +--
.../query/groupby/strategy/GroupByStrategyV2.java | 6 +-
.../druid/segment/RowBasedStorageAdapter.java | 23 +++-
...GroupByLimitPushDownInsufficientBufferTest.java | 3 +
.../GroupByLimitPushDownMultiNodeMergeTest.java | 3 +
.../query/groupby/GroupByMultiSegmentTest.java | 2 +
.../query/groupby/GroupByQueryMergeBufferTest.java | 2 +
.../groupby/GroupByQueryRunnerFailureTest.java | 2 +
.../query/groupby/GroupByQueryRunnerTest.java | 1 +
.../groupby/GroupByTimeseriesQueryRunnerTest.java | 14 +-
.../query/groupby/NestedQueryPushDownTest.java | 3 +
.../apache/druid/sql/calcite/rel/DruidQuery.java | 76 ++++++++++-
.../druid/sql/calcite/BaseCalciteQueryTest.java | 9 +-
.../sql/calcite/CalciteCorrelatedQueryTest.java | 7 +-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 146 +++++++++++++++++----
20 files changed, 300 insertions(+), 61 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 8e885856ba..cd6c48a16e 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -66,6 +66,7 @@ import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
@@ -401,6 +402,7 @@ public class GroupByTypeInterfaceBenchmark
configSupplier,
bufferPool,
mergePool,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index bc480c1063..4b0d55c2c6 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -378,6 +378,7 @@ public class CachingClusteredClientBenchmark
bufferPool,
mergeBufferPool,
mapper,
+ mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 9dd0ea5c5e..602126ba4a 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -81,6 +81,7 @@ import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.generator.DataGenerator;
@@ -516,6 +517,7 @@ public class GroupByBenchmark
configSupplier,
bufferPool,
mergePool,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
)
diff --git
a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
index 3e0cda4aed..cd746aaff7 100644
---
a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
+++
b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
@@ -101,6 +101,7 @@ public class MapVirtualColumnGroupByTest extends
InitializedNullHandlingTest
GroupByQueryConfig::new,
new StupidPool<>("map-virtual-column-groupby-test", () ->
ByteBuffer.allocate(1024)),
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1),
+ TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b5bb6bdd2b..cf5a8902a1 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.exec;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -523,7 +524,8 @@ public class ControllerImpl implements Controller
final QueryDefinition queryDef = makeQueryDefinition(
id(),
makeQueryControllerToolKit(),
- task.getQuerySpec()
+ task.getQuerySpec(),
+ context.jsonMapper()
);
QueryValidator.validateQueryDef(queryDef);
@@ -1311,7 +1313,8 @@ public class ControllerImpl implements Controller
private static QueryDefinition makeQueryDefinition(
final String queryId,
@SuppressWarnings("rawtypes") final QueryKit toolKit,
- final MSQSpec querySpec
+ final MSQSpec querySpec,
+ final ObjectMapper jsonMapper
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
@@ -1395,7 +1398,9 @@ public class ControllerImpl implements Controller
}
// Then, add a segment-generation stage.
- final DataSchema dataSchema = generateDataSchema(querySpec,
querySignature, queryClusterBy, columnMappings);
+ final DataSchema dataSchema =
+ generateDataSchema(querySpec, querySignature, queryClusterBy,
columnMappings, jsonMapper);
+
builder.add(
StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
@@ -1421,7 +1426,8 @@ public class ControllerImpl implements Controller
MSQSpec querySpec,
RowSignature querySignature,
ClusterBy queryClusterBy,
- ColumnMappings columnMappings
+ ColumnMappings columnMappings,
+ ObjectMapper jsonMapper
)
{
final DataSourceMSQDestination destination = (DataSourceMSQDestination)
querySpec.getDestination();
@@ -1442,7 +1448,7 @@ public class ControllerImpl implements Controller
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
new DimensionsSpec(dimensionsAndAggregators.lhs),
dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]),
- makeGranularitySpecForIngestion(querySpec.getQuery(),
querySpec.getColumnMappings(), isRollupQuery),
+ makeGranularitySpecForIngestion(querySpec.getQuery(),
querySpec.getColumnMappings(), isRollupQuery, jsonMapper),
new TransformSpec(null, Collections.emptyList())
);
}
@@ -1450,18 +1456,25 @@ public class ControllerImpl implements Controller
private static GranularitySpec makeGranularitySpecForIngestion(
final Query<?> query,
final ColumnMappings columnMappings,
- final boolean isRollupQuery
+ final boolean isRollupQuery,
+ final ObjectMapper jsonMapper
)
{
if (isRollupQuery) {
- final String queryGranularity =
query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
"");
+ final String queryGranularityString =
+
query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
"");
- if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) &&
!queryGranularity.isEmpty()) {
- return new ArbitraryGranularitySpec(
- Granularity.fromString(queryGranularity),
- true,
- Intervals.ONLY_ETERNITY
- );
+ if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) &&
!queryGranularityString.isEmpty()) {
+ final Granularity queryGranularity;
+
+ try {
+ queryGranularity = jsonMapper.readValue(queryGranularityString,
Granularity.class);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
+ return new ArbitraryGranularitySpec(queryGranularity, true,
Intervals.ONLY_ETERNITY);
}
return new ArbitraryGranularitySpec(Granularities.NONE, true,
Intervals.ONLY_ETERNITY);
} else {
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 0ed8e18466..624bc1cb3d 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -19,6 +19,7 @@
package org.apache.druid.query;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -26,7 +27,7 @@ import org.apache.druid.query.QueryContexts.Vectorize;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import javax.annotation.Nullable;
-
+import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -231,16 +232,18 @@ public class QueryContext
return QueryContexts.getAsEnum(key, get(key), clazz, defaultValue);
}
- public Granularity getGranularity(String key)
+ public Granularity getGranularity(String key, ObjectMapper jsonMapper)
{
- final Object value = get(key);
- if (value == null) {
+ final String granularityString = getString(key);
+ if (granularityString == null) {
return null;
}
- if (value instanceof Granularity) {
- return (Granularity) value;
- } else {
- throw QueryContexts.badTypeException(key, "a Granularity", value);
+
+ try {
+ return jsonMapper.readValue(granularityString, Granularity.class);
+ }
+ catch (IOException e) {
+ throw QueryContexts.badTypeException(key, "a Granularity",
granularityString);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index 0ee5078efd..cda33f459e 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -31,6 +31,7 @@ import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.guice.annotations.Global;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
@@ -98,6 +99,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
private final Supplier<GroupByQueryConfig> configSupplier;
private final NonBlockingPool<ByteBuffer> bufferPool;
private final BlockingPool<ByteBuffer> mergeBufferPool;
+ private final ObjectMapper jsonMapper;
private final ObjectMapper spillMapper;
private final QueryWatcher queryWatcher;
@@ -107,6 +109,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
Supplier<GroupByQueryConfig> configSupplier,
@Global NonBlockingPool<ByteBuffer> bufferPool,
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
+ @Json ObjectMapper jsonMapper,
@Smile ObjectMapper spillMapper,
QueryWatcher queryWatcher
)
@@ -115,6 +118,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
this.mergeBufferPool = mergeBufferPool;
+ this.jsonMapper = jsonMapper;
this.spillMapper = spillMapper;
this.queryWatcher = queryWatcher;
}
@@ -252,7 +256,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
// the granularity and dimensions are slightly different.
// now, part of the query plan logic is handled in GroupByStrategyV2,
not only in DruidQuery.toGroupByQuery()
final Granularity timestampResultFieldGranularity
- =
queryContext.getGranularity(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
+ =
queryContext.getGranularity(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
jsonMapper);
dimensionSpecs =
query.getDimensions()
.stream()
diff --git
a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java
index 3fed0fc1cd..b537e9d69d 100644
---
a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java
+++
b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java
@@ -66,6 +66,17 @@ public class RowBasedStorageAdapter<RowType> implements
StorageAdapter
this.rowSignature = Preconditions.checkNotNull(rowSignature,
"rowSignature");
}
+ /**
+ * Whether the provided time interval and granularity combination is allowed.
+ *
+ * We restrict ETERNITY with non-ALL granularity, because allowing it would
involve creating a very high number
+ * of time grains. This would cause queries to take an excessive amount of
time or run out of memory.
+ */
+ public static boolean isQueryGranularityAllowed(final Interval interval,
final Granularity granularity)
+ {
+ return Granularities.ALL.equals(granularity) ||
!Intervals.ETERNITY.equals(interval);
+ }
+
@Override
public Interval getInterval()
{
@@ -172,11 +183,13 @@ public class RowBasedStorageAdapter<RowType> implements
StorageAdapter
if (actualInterval == null) {
return Sequences.empty();
}
- // Incase time interval is ETERNITY with non-ALL granularity, don't risk
creating time grains.
- // For all non-ALL granularities, the time grains will be very high in
number and that can either OOM the heap
- // or create an very very long query.
- if (actualInterval.contains(Intervals.ETERNITY) &&
!gran.equals(Granularities.ALL)) {
- throw new IAE("Cannot support ETERNITY interval with %s time
granluarity", gran);
+
+ if (!isQueryGranularityAllowed(actualInterval, gran)) {
+ throw new IAE(
+ "Cannot support interval [%s] with granularity [%s]",
+ Intervals.ETERNITY.equals(actualInterval) ? "ETERNITY" :
actualInterval,
+ gran
+ );
}
final RowWalker<RowType> rowWalker = new RowWalker<>(
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index 0dfdac7fb4..88f3ea260b 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -352,6 +353,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
extends InitializedNullH
configSupplier,
bufferPool,
mergePool,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
@@ -369,6 +371,7 @@ public class GroupByLimitPushDownInsufficientBufferTest
extends InitializedNullH
configSupplier,
bufferPool,
tooSmallMergePool,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 73685d7aa5..25b76e9f52 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -74,6 +74,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
@@ -601,6 +602,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
configSupplier,
bufferPool,
mergePool,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
@@ -618,6 +620,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
configSupplier,
bufferPool,
mergePool2,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index b18b4d9252..ef68abbbbb 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -266,6 +267,7 @@ public class GroupByMultiSegmentTest
configSupplier,
bufferPool,
mergePool,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
index 00d638af69..e31249d5b1 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -139,6 +140,7 @@ public class GroupByQueryMergeBufferTest extends
InitializedNullHandlingTest
configSupplier,
BUFFER_POOL,
MERGE_BUFFER_POOL,
+ TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
index 9ba93ef63d..acfc31892f 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.apache.druid.segment.TestHelper;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
@@ -110,6 +111,7 @@ public class GroupByQueryRunnerFailureTest
configSupplier,
BUFFER_POOL,
MERGE_BUFFER_POOL,
+ TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index db1c689e84..f9060db4c9 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -412,6 +412,7 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
configSupplier,
bufferPools.getProcessingPool(),
bufferPools.getMergePool(),
+ TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
index 0877cccbb4..8204f13914 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.query.groupby;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -47,6 +49,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerTest;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
@@ -104,6 +107,7 @@ public class GroupByTimeseriesQueryRunnerTest extends
TimeseriesQueryRunnerTest
@Override
public Sequence run(QueryPlus queryPlus, ResponseContext
responseContext)
{
+ final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery();
QueryRunner<ResultRow> newRunner = factory.mergeRunners(
Execs.directExecutor(), ImmutableList.of(runner)
@@ -133,7 +137,15 @@ public class GroupByTimeseriesQueryRunnerTest extends
TimeseriesQueryRunnerTest
);
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD,
timeDimension);
-
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
granularity);
+ try {
+ theContext.put(
+ GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
+ jsonMapper.writeValueAsString(granularity)
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 0);
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index 8592fca11e..f968055063 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -73,6 +73,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
@@ -311,6 +312,7 @@ public class NestedQueryPushDownTest extends
InitializedNullHandlingTest
configSupplier,
bufferPool,
mergePool,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
@@ -328,6 +330,7 @@ public class NestedQueryPushDownTest extends
InitializedNullHandlingTest
configSupplier,
bufferPool,
mergePool2,
+ TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 44b0786cb6..14197005d7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -63,6 +63,7 @@ import
org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
@@ -71,6 +72,7 @@ import org.apache.druid.query.topn.InvertedTopNMetricSpec;
import org.apache.druid.query.topn.NumericTopNMetricSpec;
import org.apache.druid.query.topn.TopNMetricSpec;
import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.RowBasedStorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -90,6 +92,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rule.GroupByRules;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.RowSignatures;
+import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -118,6 +121,13 @@ public class DruidQuery
*/
public static final String CTX_SCAN_SIGNATURE = "scanSignature";
+ /**
+ * Maximum number of time-granular buckets that we allow for non-Druid
tables.
+ *
+ * Used by {@link #canUseQueryGranularity}.
+ */
+ private static final int MAX_TIME_GRAINS_NON_DRUID_TABLE = 100000;
+
private final DataSource dataSource;
private final PlannerContext plannerContext;
@@ -774,6 +784,53 @@ public class DruidQuery
return
Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
}
+ /**
+ * Whether the provided combination of dataSource, filtration, and
queryGranularity is safe to use in queries.
+ *
+ * Necessary because some combinations are unsafe, mainly because they would
lead to the creation of too many
+ * time-granular buckets during query processing.
+ */
+ private static boolean canUseQueryGranularity(
+ final DataSource dataSource,
+ final Filtration filtration,
+ final Granularity queryGranularity
+ )
+ {
+ if (Granularities.ALL.equals(queryGranularity)) {
+ // Always OK: no storage adapter has problem with ALL.
+ return true;
+ }
+
+ if (DataSourceAnalysis.forDataSource(dataSource).isConcreteTableBased()) {
+ // Always OK: queries on concrete tables (regular Druid datasources) use
segment-based storage adapters
+ // (IncrementalIndex or QueryableIndex). These clip query interval to
data interval, making wide query
+ // intervals safer. They do not have special checks for granularity and
interval safety.
+ return true;
+ }
+
+ // Query is against something other than a regular Druid table. Apply
additional checks, because we can't
+ // count on interval-clipping to save us.
+
+ for (final Interval filtrationInterval : filtration.getIntervals()) {
+ // Query may be using RowBasedStorageAdapter. We don't know for sure, so
check
+ // RowBasedStorageAdapter#isQueryGranularityAllowed to be safe.
+ if
(!RowBasedStorageAdapter.isQueryGranularityAllowed(filtrationInterval,
queryGranularity)) {
+ return false;
+ }
+
+ // Validate the interval against MAX_TIME_GRAINS_NON_DRUID_TABLE.
+ // Estimate based on the size of the first bucket, to avoid computing
them all. (That's what we're
+ // trying to avoid!)
+ final Interval firstBucket =
queryGranularity.bucket(filtrationInterval.getStart());
+ final long estimatedNumBuckets = filtrationInterval.toDurationMillis() /
firstBucket.toDurationMillis();
+ if (estimatedNumBuckets > MAX_TIME_GRAINS_NON_DRUID_TABLE) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
public DataSource getDataSource()
{
return dataSource;
@@ -1004,6 +1061,10 @@ public class DruidQuery
final DataSource newDataSource = dataSourceFiltrationPair.lhs;
final Filtration filtration = dataSourceFiltrationPair.rhs;
+ if (!canUseQueryGranularity(dataSource, filtration, queryGranularity)) {
+ return null;
+ }
+
final List<PostAggregator> postAggregators = new
ArrayList<>(grouping.getPostAggregators());
if (sorting != null && sorting.getProjection() != null) {
postAggregators.addAll(sorting.getProjection().getPostAggregators());
@@ -1208,7 +1269,8 @@ public class DruidQuery
dimensionExpression.getDruidExpression(),
plannerContext.getExprMacroTable()
);
- if (granularity == null) {
+ if (granularity == null || !canUseQueryGranularity(dataSource,
filtration, granularity)) {
+ // Can't, or won't, convert this dimension to a query granularity.
continue;
}
if (queryGranularity != null) {
@@ -1219,10 +1281,20 @@ public class DruidQuery
}
queryGranularity = granularity;
int timestampDimensionIndexInDimensions =
grouping.getDimensions().indexOf(dimensionExpression);
+
// these settings will only affect the most inner query sent to the
down streaming compute nodes
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD,
dimensionExpression.getOutputName());
theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX,
timestampDimensionIndexInDimensions);
- theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
queryGranularity);
+
+ try {
+ theContext.put(
+ GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
+
plannerContext.getJsonMapper().writeValueAsString(queryGranularity)
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
if (queryGranularity == null) {
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index 617e5acd58..020166d110 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -1275,7 +1275,14 @@ public class BaseCalciteQueryTest extends CalciteTestBase
{
Map<String, Object> output = new HashMap<>(input);
output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField);
- output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
granularity);
+
+ try {
+ output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
queryJsonMapper.writeValueAsString(granularity));
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX,
timestampResultFieldIndex);
return output;
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
index b019ad090b..3a1ac2db9a 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
@@ -49,7 +49,6 @@ import org.junit.runner.RunWith;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
@RunWith(JUnitParamsRunner.class)
@@ -546,10 +545,6 @@ public class CalciteCorrelatedQueryTest extends
BaseCalciteQueryTest
Granularity granularity
)
{
- Map<String, Object> output = new HashMap<>(input);
- output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField);
- output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
granularity);
- output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 0);
- return output;
+ return withTimestampResultContext(input, timestampResultField, 0,
granularity);
}
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 0e0579c1a2..87338a40e6 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -28,7 +28,6 @@ import org.apache.calcite.runtime.CalciteContextException;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
-import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
@@ -6948,6 +6947,87 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
);
}
+ @Test
+ public void testUseTimeFloorInsteadOfGranularityOnJoinResult()
+ {
+ cannotVectorize();
+
+ testQuery(
+ "WITH main AS (SELECT * FROM foo LIMIT 2)\n"
+ + "SELECT TIME_FLOOR(__time, 'PT1H') AS \"time\", dim1, COUNT(*)\n"
+ + "FROM main\n"
+ + "WHERE dim1 IN (SELECT dim1 FROM main GROUP BY 1 ORDER BY COUNT(*)
DESC LIMIT 5)\n"
+ + "GROUP BY 1, 2",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ join(
+ new QueryDataSource(
+ newScanQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+
.intervals(querySegmentSpec(Intervals.ETERNITY))
+ .columns("__time", "dim1")
+ .limit(2)
+ .build()
+ ),
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource(
+ new QueryDataSource(
+ newScanQueryBuilder()
+
.dataSource(CalciteTests.DATASOURCE1)
+
.intervals(querySegmentSpec(Intervals.ETERNITY))
+ .columns("dim1")
+ .limit(2)
+ .build()
+ )
+ )
+
.setInterval(querySegmentSpec(Intervals.ETERNITY))
+
.setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("dim1", "d0")))
+
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new
OrderByColumnSpec(
+ "a0",
+
Direction.DESCENDING,
+
StringComparators.NUMERIC
+ )
+ ),
+ 5
+ )
+ )
+ .build()
+ ),
+ "j0.",
+ "(\"dim1\" == \"j0.d0\")",
+ JoinType.INNER
+ )
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setVirtualColumns(
+ expressionVirtualColumn(
+ "v0",
+
"timestamp_floor(\"__time\",'PT1H',null,'UTC')",
+ ColumnType.LONG
+ )
+ )
+ .setDimensions(dimensions(
+ new DefaultDimensionSpec("v0", "d0",
ColumnType.LONG),
+ new DefaultDimensionSpec("dim1", "d1")
+ ))
+ .setAggregatorSpecs(aggregators(new
CountAggregatorFactory("a0")))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ NullHandling.sqlCompatible()
+ ? ImmutableList.of(new Object[]{946684800000L, "", 1L}, new
Object[]{946771200000L, "10.1", 1L})
+ : ImmutableList.of(new Object[]{946771200000L, "10.1", 1L})
+ );
+ }
+
@Test
public void testMinMaxAvgDailyCountWithLimit()
{
@@ -14168,29 +14248,47 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
{
// the SQL query contains an always FALSE filter ('bar' = 'baz'), which
optimizes the query to also remove time
// filter. the converted query hence contains ETERNITY interval but still
a MONTH granularity due to the grouping.
- // Such a query should fail since it will create a huge amount of time
grains which can lead to OOM or a very very
- // high query time.
- Assert.assertThrows(IAE.class, () ->
- testQuery(
- "SELECT TIME_FLOOR(__time, 'P1m'), max(m1) from \"foo\"\n"
- + "WHERE __time > CURRENT_TIMESTAMP - INTERVAL '3' MONTH AND
'bar'='baz'\n"
- + "GROUP BY 1\n"
- + "ORDER BY 1 DESC",
- ImmutableList.of(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(
- InlineDataSource.fromIterable(
- ImmutableList.of(),
- RowSignature.builder().addTimeColumn().add("m1",
ColumnType.STRING).build()
- ))
- .intervals(ImmutableList.of(Intervals.ETERNITY))
- .descending(true)
- .granularity(Granularities.MONTH)
- .aggregators(new LongMaxAggregatorFactory("a0", "m1"))
- .build()
- ),
- ImmutableList.of()
- )
+ // Such a query should plan into a GroupBy query with a timestamp_floor
function, instead of a timeseries
+ // with granularity MONTH, to avoid excessive materialization of time
grains.
+ //
+ // See DruidQuery#canUseQueryGranularity for the relevant check.
+
+ cannotVectorize();
+
+ testQuery(
+ "SELECT TIME_FLOOR(__time, 'P1m'), max(m1) from \"foo\"\n"
+ + "WHERE __time > CURRENT_TIMESTAMP - INTERVAL '3' MONTH AND
'bar'='baz'\n"
+ + "GROUP BY 1\n"
+ + "ORDER BY 1 DESC",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(InlineDataSource.fromIterable(
+ ImmutableList.of(),
+ RowSignature.builder()
+ .addTimeColumn()
+ .add("m1", ColumnType.FLOAT)
+ .build()
+ ))
+ .setInterval(querySegmentSpec(Intervals.ETERNITY))
+ .setVirtualColumns(expressionVirtualColumn(
+ "v0",
+ "timestamp_floor(\"__time\",'P1m',null,'UTC')",
+ ColumnType.LONG
+ ))
+ .setGranularity(Granularities.ALL)
+ .addDimension(new DefaultDimensionSpec("v0", "d0",
ColumnType.LONG))
+ .addAggregator(new FloatMaxAggregatorFactory("a0",
"m1"))
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("d0",
Direction.DESCENDING, StringComparators.NUMERIC)
+ ),
+ null
+ )
+ )
+ .build()
+ ),
+ ImmutableList.of()
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]