This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0df0bff Enable multiple distinct aggregators in same query (#11014)
0df0bff is described below
commit 0df0bff44b0bbf3b6655bb0cffc384f58e69ac1f
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Wed Apr 7 13:22:19 2021 +0530
Enable multiple distinct aggregators in same query (#11014)
* Enable multiple distinct count
* Add more tests
* fix sql test
* docs fix
* Address nits
---
docs/configuration/index.md | 1 +
docs/querying/sql.md | 3 +-
.../query/groupby/strategy/GroupByStrategyV2.java | 35 +++--
.../query/groupby/GroupByQueryMergeBufferTest.java | 171 +++++++++++++++++++--
.../QueryRunnerBasedOnClusteredClientTestBase.java | 5 +-
.../org/apache/druid/server/QueryStackTests.java | 20 ++-
.../druid/sql/calcite/planner/PlannerConfig.java | 14 ++
.../apache/druid/sql/calcite/planner/Rules.java | 8 +-
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 12 ++
.../druid/sql/calcite/BaseCalciteQueryTest.java | 12 ++
.../sql/calcite/CalciteCorrelatedQueryTest.java | 85 ++--------
.../apache/druid/sql/calcite/CalciteQueryTest.java | 72 +++++++++
.../druid/sql/calcite/util/CalciteTests.java | 60 ++++++++
website/.spelling | 2 +
14 files changed, 396 insertions(+), 104 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 2e3a56d..7128194 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1709,6 +1709,7 @@ The Druid SQL server is configured through the following
properties on the Broke
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN
query](../querying/topnquery.md). Higher limits will be planned as [GroupBy
queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata
refreshes.|PT1M|
|`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate
cardinality algorithm for `COUNT(DISTINCT foo)`.|true|
+|`druid.sql.planner.useGroupingSetForExactDistinct`|Only relevant when
`useApproximateCountDistinct` is disabled. If set to true, exact distinct
queries are re-written using grouping sets. Otherwise, exact distinct queries
are re-written using joins. This should be set to true for group by query with
multiple exact distinct aggregations. This flag can be overridden per
query.|false|
|`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN
queries](../querying/topnquery.md) when a SQL query could be expressed as such.
If false, exact [GroupBy queries](../querying/groupbyquery.md) will be used
instead.|true|
|`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have
filter conditions on __time column so that all generated native queries will
have user specified intervals. If true, all queries without filter condition on
__time column will fail|false|
|`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server,
which will affect how time functions and timestamp literals behave. Should be a
time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC|
diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index c5ba6ac..12c1d3f 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -322,7 +322,7 @@ Only the COUNT aggregation can accept DISTINCT.
|Function|Notes|
|--------|-----|
|`COUNT(*)`|Counts the number of rows.|
-|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string,
numeric, or hyperUnique. By default this is approximate, using a variant of
[HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To
get exact counts set "useApproximateCountDistinct" to "false". If you do this,
expr must be string or numeric, since exact counts are not possible using
hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode,
only one distinct count per query [...]
+|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string,
numeric, or hyperUnique. By default this is approximate, using a variant of
[HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To
get exact counts set "useApproximateCountDistinct" to "false". If you do this,
expr must be string or numeric, since exact counts are not possible using
hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode,
only one distinct count per query [...]
|`SUM(expr)`|Sums numbers.|
|`MIN(expr)`|Takes the minimum of numbers.|
|`MAX(expr)`|Takes the maximum of numbers.|
@@ -1015,6 +1015,7 @@ Connection context can be specified as JDBC connection
properties or as a "conte
|`sqlQueryId`|Unique identifier given to this SQL query. For HTTP client, it
will be returned in `X-Druid-SQL-Query-Id` header.|auto-generated|
|`sqlTimeZone`|Sets the time zone for this connection, which will affect how
time functions and timestamp literals behave. Should be a time zone name like
"America/Los_Angeles" or offset like "-08:00".|druid.sql.planner.sqlTimeZone on
the Broker (default: UTC)|
|`useApproximateCountDistinct`|Whether to use an approximate cardinality
algorithm for `COUNT(DISTINCT
foo)`.|druid.sql.planner.useApproximateCountDistinct on the Broker (default:
true)|
+|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute
queries with multiple exact distinct
aggregations.|druid.sql.planner.useGroupingSetForExactDistinct on the Broker
(default: false)|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md)
when a SQL query could be expressed as such. If false, exact [GroupBy
queries](groupbyquery.md) will be used
instead.|druid.sql.planner.useApproximateTopN on the Broker (default: true)|
## Metadata tables
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index 95338f5..23d904a 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.groupby.strategy;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
@@ -85,8 +86,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp";
public static final String CTX_KEY_OUTERMOST = "groupByOutermost";
- // see countRequiredMergeBufferNum() for explanation
- private static final int MAX_MERGE_BUFFER_NUM = 2;
+ // see countRequiredMergeBufferNumWithoutSubtotal() for explanation
+ private static final int MAX_MERGE_BUFFER_NUM_WITHOUT_SUBTOTAL = 2;
private final DruidProcessingConfig processingConfig;
private final Supplier<GroupByQueryConfig> configSupplier;
@@ -116,8 +117,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
@Override
public GroupByQueryResource prepareResource(GroupByQuery query)
{
- final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) +
-
numMergeBuffersNeededForSubtotalsSpec(query);
+ final int requiredMergeBufferNum = countRequiredMergeBufferNum(query);
if (requiredMergeBufferNum > mergeBufferPool.maxSize()) {
throw new ResourceLimitExceededException(
@@ -146,7 +146,13 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
- private static int countRequiredMergeBufferNum(Query query, int foundNum)
+ @VisibleForTesting
+ public static int countRequiredMergeBufferNum(GroupByQuery query)
+ {
+ return countRequiredMergeBufferNumWithoutSubtotal(query, 1) +
numMergeBuffersNeededForSubtotalsSpec(query);
+ }
+
+ private static int countRequiredMergeBufferNumWithoutSubtotal(Query query,
int foundNum)
{
// Note: A broker requires merge buffers for processing the groupBy layers
beyond the inner-most one.
// For example, the number of required merge buffers for a nested groupBy
(groupBy -> groupBy -> table) is 1.
@@ -156,10 +162,10 @@ public class GroupByStrategyV2 implements GroupByStrategy
// This is same for subsequent groupBy layers, and thus the maximum number
of required merge buffers becomes 2.
final DataSource dataSource = query.getDataSource();
- if (foundNum == MAX_MERGE_BUFFER_NUM + 1 || !(dataSource instanceof
QueryDataSource)) {
+ if (foundNum == MAX_MERGE_BUFFER_NUM_WITHOUT_SUBTOTAL + 1 || !(dataSource
instanceof QueryDataSource)) {
return foundNum - 1;
} else {
- return countRequiredMergeBufferNum(((QueryDataSource)
dataSource).getQuery(), foundNum + 1);
+ return countRequiredMergeBufferNumWithoutSubtotal(((QueryDataSource)
dataSource).getQuery(), foundNum + 1);
}
}
@@ -522,11 +528,20 @@ public class GroupByStrategyV2 implements GroupByStrategy
return aggsAndPostAggs;
}
- private int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query)
+ private static int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query)
{
List<List<String>> subtotalSpecs = query.getSubtotalsSpec();
+ final DataSource dataSource = query.getDataSource();
+ int numMergeBuffersNeededForSubQuerySubtotal = 0;
+ if (dataSource instanceof QueryDataSource) {
+ Query<?> subQuery = ((QueryDataSource) dataSource).getQuery();
+ if (subQuery instanceof GroupByQuery) {
+ numMergeBuffersNeededForSubQuerySubtotal =
numMergeBuffersNeededForSubtotalsSpec((GroupByQuery) subQuery);
+ }
+
+ }
if (subtotalSpecs == null || subtotalSpecs.size() == 0) {
- return 0;
+ return numMergeBuffersNeededForSubQuerySubtotal;
}
List<String> queryDimOutputNames =
query.getDimensions().stream().map(DimensionSpec::getOutputName).collect(
@@ -537,7 +552,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
- return 1;
+ return Math.max(1, numMergeBuffersNeededForSubQuerySubtotal);
}
@Override
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
index 414a89f..faed09a 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java
@@ -51,6 +51,7 @@ import org.junit.runners.Parameterized.Parameters;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -109,7 +110,7 @@ public class GroupByQueryMergeBufferTest extends
InitializedNullHandlingTest
@Override
public int getNumMergeBuffers()
{
- return 3;
+ return 4;
}
@Override
@@ -211,10 +212,11 @@ public class GroupByQueryMergeBufferTest extends
InitializedNullHandlingTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
+ Assert.assertEquals(0,
GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
- Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
- Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
+ Assert.assertEquals(3, MERGE_BUFFER_POOL.getMinRemainBufferNum());
+ Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
@@ -239,10 +241,11 @@ public class GroupByQueryMergeBufferTest extends
InitializedNullHandlingTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
+ Assert.assertEquals(1,
GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
- Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
- Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
+ Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
+ Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
@@ -278,11 +281,12 @@ public class GroupByQueryMergeBufferTest extends
InitializedNullHandlingTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
+ Assert.assertEquals(2,
GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
- // This should be 0 because the broker needs 2 buffers and the queryable
node needs one.
- Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
- Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
+ // This should be 1 because the broker needs 2 buffers and the queryable
node needs one.
+ Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
+ Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
@Test
@@ -331,10 +335,157 @@ public class GroupByQueryMergeBufferTest extends
InitializedNullHandlingTest
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();
+ Assert.assertEquals(2,
GroupByStrategyV2.countRequiredMergeBufferNum(query));
+ GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
+
+ // This should be 1 because the broker needs 2 buffers and the queryable
node needs one.
+ Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
+ Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
+ }
+
+ @Test
+ public void testSimpleGroupByWithSubtotals()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setDimensions(Arrays.asList(
+ DefaultDimensionSpec.of(QueryRunnerTestHelper.MARKET_DIMENSION),
+ DefaultDimensionSpec.of(QueryRunnerTestHelper.PLACEMENT_DIMENSION),
+ DefaultDimensionSpec.of(QueryRunnerTestHelper.QUALITY_DIMENSION)
+ ))
+ .setGranularity(Granularities.ALL)
+ .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setSubtotalsSpec(Arrays.asList(
+ Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION,
QueryRunnerTestHelper.PLACEMENT_DIMENSION),
+ Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION,
QueryRunnerTestHelper.PLACEMENT_DIMENSION,
QueryRunnerTestHelper.QUALITY_DIMENSION)
+ ))
+ .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
+ .build();
+
+ Assert.assertEquals(1,
GroupByStrategyV2.countRequiredMergeBufferNum(query));
+ GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
+
+ // 1 for subtotal and 1 for GroupByQueryRunnerFactory#mergeRunners
+ Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
+ Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
+ }
+
+ @Test
+ public void testSimpleGroupByWithSubtotalsWithoutPrefixMatch()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setDimensions(Arrays.asList(
+ DefaultDimensionSpec.of(QueryRunnerTestHelper.MARKET_DIMENSION),
+ DefaultDimensionSpec.of(QueryRunnerTestHelper.PLACEMENT_DIMENSION),
+ DefaultDimensionSpec.of(QueryRunnerTestHelper.QUALITY_DIMENSION)
+ ))
+ .setGranularity(Granularities.ALL)
+ .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setSubtotalsSpec(Arrays.asList(
+ Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION,
QueryRunnerTestHelper.PLACEMENT_DIMENSION),
+ Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION,
QueryRunnerTestHelper.QUALITY_DIMENSION)
+ ))
+ .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
+ .build();
+
+ Assert.assertEquals(2,
GroupByStrategyV2.countRequiredMergeBufferNum(query));
+ GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
+
+ // 2 needed by subtotal and 1 for GroupByQueryRunnerFactory#mergeRunners
+ Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
+ Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
+ }
+
+ @Test
+ public void testNestedGroupByWithSubtotals()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setGranularity(Granularities.ALL)
+ .setDimensions(Arrays.asList(
+ DefaultDimensionSpec.of("quality"),
+ DefaultDimensionSpec.of("market"),
+ DefaultDimensionSpec.of("placement")
+ ))
+
.setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT))
+ .build()
+ )
+ )
+ .setGranularity(Granularities.ALL)
+ .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(Arrays.asList(
+ DefaultDimensionSpec.of("quality"),
+ DefaultDimensionSpec.of("market")
+ ))
+ .setSubtotalsSpec(Arrays.asList(
+ Collections.singletonList("quality"),
+ Collections.singletonList("market")
+ ))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
+ .build();
+
+ Assert.assertEquals(3,
GroupByStrategyV2.countRequiredMergeBufferNum(query));
+ GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
+
+ // 2 for subtotal, 1 for nested group by and 1 for
GroupByQueryRunnerFactory#mergeRunners
+ Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
+ Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
+ }
+
+ @Test
+ public void testNestedGroupByWithNestedSubtotals()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setGranularity(Granularities.ALL)
+ .setDimensions(Arrays.asList(
+ DefaultDimensionSpec.of("quality"),
+ DefaultDimensionSpec.of("market"),
+ DefaultDimensionSpec.of("placement")
+ ))
+ .setSubtotalsSpec(Arrays.asList(
+ Collections.singletonList("quality"),
+ Collections.singletonList("market")
+ ))
+
.setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT))
+ .build()
+ )
+ )
+ .setGranularity(Granularities.ALL)
+ .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(Arrays.asList(
+ DefaultDimensionSpec.of("quality"),
+ DefaultDimensionSpec.of("market")
+ ))
+ .setSubtotalsSpec(Arrays.asList(
+ Collections.singletonList("quality"),
+ Collections.singletonList("market")
+ ))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
+ .build();
+
+ Assert.assertEquals(3,
GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
- // This should be 0 because the broker needs 2 buffers and the queryable
node needs one.
+ // 2 for subtotal, 1 for nested group by and 1 for
GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
- Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
+ Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
}
diff --git
a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
index 6805c10..ae0c269 100644
---
a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
+++
b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
@@ -139,7 +139,10 @@ public abstract class
QueryRunnerBasedOnClusteredClientTestBase
new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(),
0),
new CacheConfig(),
new DruidHttpClientConfig(),
-
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
+ QueryStackTests.getProcessingConfig(
+ USE_PARALLEL_MERGE_POOL_CONFIGURED,
+ DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
+ ),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 3c6072a..074649f 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -180,7 +180,10 @@ public class QueryStackTests
);
}
- public static DruidProcessingConfig getProcessingConfig(boolean
useParallelMergePoolConfigured)
+ public static DruidProcessingConfig getProcessingConfig(
+ boolean useParallelMergePoolConfigured,
+ final int mergeBuffers
+ )
{
return new DruidProcessingConfig()
{
@@ -206,9 +209,10 @@ public class QueryStackTests
@Override
public int getNumMergeBuffers()
{
- // Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
- // Two buffers for the broker and one for the queryable.
- return 3;
+ if (mergeBuffers == DEFAULT_NUM_MERGE_BUFFERS) {
+ return 2;
+ }
+ return mergeBuffers;
}
@Override
@@ -230,9 +234,15 @@ public class QueryStackTests
public static QueryRunnerFactoryConglomerate
createQueryRunnerFactoryConglomerate(
final Closer closer,
final boolean useParallelMergePoolConfigured
+
)
{
- return createQueryRunnerFactoryConglomerate(closer,
getProcessingConfig(useParallelMergePoolConfigured));
+ return createQueryRunnerFactoryConglomerate(closer,
+ getProcessingConfig(
+
useParallelMergePoolConfigured,
+
DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
+ )
+ );
}
public static QueryRunnerFactoryConglomerate
createQueryRunnerFactoryConglomerate(
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
index 34ef9af..f8da5cb 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
@@ -30,6 +30,7 @@ import java.util.Objects;
public class PlannerConfig
{
public static final String CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT =
"useApproximateCountDistinct";
+ public static final String CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT =
"useGroupingSetForExactDistinct";
public static final String CTX_KEY_USE_APPROXIMATE_TOPN =
"useApproximateTopN";
@JsonProperty
@@ -59,6 +60,9 @@ public class PlannerConfig
@JsonProperty
private long metadataSegmentPollPeriod = 60000;
+ @JsonProperty
+ private boolean useGroupingSetForExactDistinct = false;
+
public long getMetadataSegmentPollPeriod()
{
return metadataSegmentPollPeriod;
@@ -86,6 +90,11 @@ public class PlannerConfig
return useApproximateCountDistinct;
}
+ public boolean isUseGroupingSetForExactDistinct()
+ {
+ return useGroupingSetForExactDistinct;
+ }
+
public boolean isUseApproximateTopN()
{
return useApproximateTopN;
@@ -125,6 +134,11 @@ public class PlannerConfig
CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT,
isUseApproximateCountDistinct()
);
+ newConfig.useGroupingSetForExactDistinct = getContextBoolean(
+ context,
+ CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT,
+ isUseGroupingSetForExactDistinct()
+ );
newConfig.useApproximateTopN = getContextBoolean(
context,
CTX_KEY_USE_APPROXIMATE_TOPN,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
index f39aab5..8425d55 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java
@@ -268,9 +268,11 @@ public class Rules
rules.addAll(ABSTRACT_RELATIONAL_RULES);
if (!plannerConfig.isUseApproximateCountDistinct()) {
- // For some reason, even though we support grouping sets, using
AggregateExpandDistinctAggregatesRule.INSTANCE
- // here causes
CalciteQueryTest#testExactCountDistinctWithGroupingAndOtherAggregators to fail.
- rules.add(AggregateExpandDistinctAggregatesRule.JOIN);
+ if (plannerConfig.isUseGroupingSetForExactDistinct()) {
+ rules.add(AggregateExpandDistinctAggregatesRule.INSTANCE);
+ } else {
+ rules.add(AggregateExpandDistinctAggregatesRule.JOIN);
+ }
}
// Rules that we wrote.
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index b39c351..666fa15 100644
---
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -426,6 +426,12 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
Pair.of("TABLE_NAME", CalciteTests.SOMEXDATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
+ ),
+ row(
+ Pair.of("TABLE_CAT", "druid"),
+ Pair.of("TABLE_NAME", CalciteTests.USERVISITDATASOURCE),
+ Pair.of("TABLE_SCHEM", "druid"),
+ Pair.of("TABLE_TYPE", "TABLE")
)
),
getRows(
@@ -494,6 +500,12 @@ public abstract class DruidAvaticaHandlerTest extends
CalciteTestBase
Pair.of("TABLE_NAME", CalciteTests.SOMEXDATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
+ ),
+ row(
+ Pair.of("TABLE_CAT", "druid"),
+ Pair.of("TABLE_NAME", CalciteTests.USERVISITDATASOURCE),
+ Pair.of("TABLE_SCHEM", "druid"),
+ Pair.of("TABLE_TYPE", "TABLE")
)
),
getRows(
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index 8e03926..8396d38 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -891,4 +891,16 @@ public class BaseCalciteQueryTest extends CalciteTestBase
newContext.put(QueryContexts.SQL_JOIN_LEFT_SCAN_DIRECT, true);
return newContext;
}
+
+ /**
+ * Reset the walker and conglomerate with required number of merge buffers.
Default value is 2.
+ */
+ protected void requireMergeBuffers(int numMergeBuffers) throws IOException
+ {
+ conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(
+ resourceCloser,
+ QueryStackTests.getProcessingConfig(true, numMergeBuffers)
+ );
+ walker = CalciteTests.createMockWalker(conglomerate,
temporaryFolder.newFolder());
+ }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
index 3a125d5..46c3991 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java
@@ -20,12 +20,8 @@
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.query.QueryDataSource;
@@ -42,58 +38,21 @@ import
org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.groupby.GroupByQuery;
-import org.apache.druid.segment.IndexBuilder;
-import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ValueType;
-import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
-import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.Before;
+import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Test;
import org.junit.runner.RunWith;
-import java.io.File;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
@RunWith(JUnitParamsRunner.class)
public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
{
- private static final IncrementalIndexSchema INDEX_SCHEMA = new
IncrementalIndexSchema.Builder()
- .withMetrics(
- new CountAggregatorFactory("cnt")
- )
- .withRollup(false)
- .withMinTimestamp(DateTimes.of("2020-12-31").getMillis())
- .build();
- private static final List<String> DIMENSIONS = ImmutableList.of("user",
"country", "city");
-
- @Before
- public void setup() throws Exception
- {
- final QueryableIndex index1 = IndexBuilder
- .create()
- .tmpDir(new File(temporaryFolder.newFolder(), "1"))
-
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
- .schema(INDEX_SCHEMA)
- .rows(getRawRows())
- .buildMMappedIndex();
- final DataSegment segment = DataSegment.builder()
- .dataSource("visits")
- .interval(index1.getDataInterval())
- .version("1")
- .shardSpec(new LinearShardSpec(0))
- .size(0)
- .build();
- walker.add(segment, index1);
-
- }
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@@ -115,12 +74,12 @@ public class CalciteCorrelatedQueryTest extends
BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
- new TableDataSource("visits"),
+ new
TableDataSource(CalciteTests.USERVISITDATASOURCE),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
-
.setDataSource("visits")
+
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@@ -222,12 +181,12 @@ public class CalciteCorrelatedQueryTest extends
BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
- new TableDataSource("visits"),
+ new
TableDataSource(CalciteTests.USERVISITDATASOURCE),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
-
.setDataSource("visits")
+
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@@ -304,7 +263,7 @@ public class CalciteCorrelatedQueryTest extends
BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
- new
QueryDataSource(newScanQueryBuilder().dataSource("visits")
+ new
QueryDataSource(newScanQueryBuilder().dataSource(CalciteTests.USERVISITDATASOURCE)
.intervals(querySegmentSpec(Intervals.of(
"2021-01-01T01:00:00.000Z/2021-01-02T23:59:59.001Z")))
.filters(selector("city", "B", null))
@@ -314,7 +273,7 @@ public class CalciteCorrelatedQueryTest extends
BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
-
.setDataSource("visits")
+
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@@ -390,12 +349,12 @@ public class CalciteCorrelatedQueryTest extends
BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
- new TableDataSource("visits"),
+ new
TableDataSource(CalciteTests.USERVISITDATASOURCE),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
-
.setDataSource("visits")
+
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@@ -477,12 +436,12 @@ public class CalciteCorrelatedQueryTest extends
BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
join(
- new TableDataSource("visits"),
+ new
TableDataSource(CalciteTests.USERVISITDATASOURCE),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
-
.setDataSource("visits")
+
.setDataSource(CalciteTests.USERVISITDATASOURCE)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setVirtualColumns(new ExpressionVirtualColumn(
"v0",
@@ -544,26 +503,4 @@ public class CalciteCorrelatedQueryTest extends
BaseCalciteQueryTest
);
}
- private List<InputRow> getRawRows()
- {
- return ImmutableList.of(
- toRow("2021-01-01T01:00:00Z", ImmutableMap.of("user", "alice",
"country", "canada", "city", "A")),
- toRow("2021-01-01T02:00:00Z", ImmutableMap.of("user", "alice",
"country", "canada", "city", "B")),
- toRow("2021-01-01T03:00:00Z", ImmutableMap.of("user", "bob",
"country", "canada", "city", "A")),
- toRow("2021-01-01T04:00:00Z", ImmutableMap.of("user", "alice",
"country", "India", "city", "Y")),
- toRow("2021-01-02T01:00:00Z", ImmutableMap.of("user", "alice",
"country", "canada", "city", "A")),
- toRow("2021-01-02T02:00:00Z", ImmutableMap.of("user", "bob",
"country", "canada", "city", "A")),
- toRow("2021-01-02T03:00:00Z", ImmutableMap.of("user", "foo",
"country", "canada", "city", "B")),
- toRow("2021-01-02T04:00:00Z", ImmutableMap.of("user", "bar",
"country", "canada", "city", "B")),
- toRow("2021-01-02T05:00:00Z", ImmutableMap.of("user", "alice",
"country", "India", "city", "X")),
- toRow("2021-01-02T06:00:00Z", ImmutableMap.of("user", "bob",
"country", "India", "city", "X")),
- toRow("2021-01-02T07:00:00Z", ImmutableMap.of("user", "foo",
"country", "India", "city", "X")),
- toRow("2021-01-03T01:00:00Z", ImmutableMap.of("user", "foo",
"country", "USA", "city", "M"))
- );
- }
-
- private MapBasedInputRow toRow(String time, Map<String, Object> event)
- {
- return new MapBasedInputRow(DateTimes.ISO_DATE_OPTIONAL_TIME.parse(time),
DIMENSIONS, event);
- }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index c23b78a..0b58cff 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -111,6 +111,7 @@ import
org.apache.druid.sql.SqlPlanningException.PlanningError;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
import org.apache.druid.sql.calcite.util.CalciteTests;
@@ -877,6 +878,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE",
"NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE",
"NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE",
"NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE,
"TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE",
"NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA",
"SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE",
"NO", "NO"})
@@ -912,6 +914,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE",
"NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE",
"NO", "NO"})
.add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE",
"NO", "NO"})
+ .add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE,
"TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE",
"NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA",
"SYSTEM_TABLE", "NO", "NO"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE",
"NO", "NO"})
@@ -7610,6 +7613,72 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
}
@Test
+ public void testMultipleExactCountDistinctWithGroupingAndOtherAggregators()
throws Exception
+ {
+ requireMergeBuffers(4);
+ testQuery(
+
PLANNER_CONFIG_NO_HLL.withOverrides(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT,
"true")),
+ "SELECT FLOOR(__time to day), COUNT(distinct city), COUNT(distinct
user) FROM druid.visits GROUP BY 1",
+ CalciteTests.REGULAR_USER_AUTH_RESULT,
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+
.setDataSource(CalciteTests.USERVISITDATASOURCE)
+
.setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+
.setVirtualColumns(expressionVirtualColumn(
+ "v0",
+
"timestamp_floor(\"__time\",'P1D',null,'UTC')",
+ ValueType.LONG
+ ))
+ .setDimensions(dimensions(
+ new DefaultDimensionSpec("v0",
"d0", ValueType.LONG),
+ new
DefaultDimensionSpec("city", "d1"),
+ new
DefaultDimensionSpec("user", "d2")
+ ))
+
.setAggregatorSpecs(aggregators(new GroupingAggregatorFactory(
+ "a0",
+ Arrays.asList(
+ "v0",
+ "city",
+ "user"
+ )
+ )))
+ .setSubtotalsSpec(ImmutableList.of(
+ ImmutableList.of("d0", "d1"),
+ ImmutableList.of("d0", "d2")
+ ))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ )
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("d0", "_d0", ValueType.LONG)))
+ .setAggregatorSpecs(aggregators(
+ new FilteredAggregatorFactory(
+ new CountAggregatorFactory("_a0"),
+ and(not(selector("d1", null, null)),
selector("a0", "1", null))
+ ),
+ new FilteredAggregatorFactory(
+ new CountAggregatorFactory("_a1"),
+ and(not(selector("d2", null, null)),
selector("a0", "2", null))
+ )
+ ))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{1609459200000L, 3L, 2L},
+ new Object[]{1609545600000L, 3L, 4L},
+ new Object[]{1609632000000L, 1L, 1L}
+ )
+ );
+ }
+
+ @Test
public void testApproxCountDistinct() throws Exception
{
// Cannot vectorize due to virtual columns.
@@ -7760,6 +7829,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
public void testDoubleNestedGroupBy() throws Exception
{
+ requireMergeBuffers(3);
testQuery(
"SELECT SUM(cnt), COUNT(*) FROM (\n"
+ " SELECT dim2, SUM(t1.cnt) cnt FROM (\n"
@@ -12521,6 +12591,8 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
@Test
public void testGroupingAggregatorDifferentOrder() throws Exception
{
+ requireMergeBuffers(3);
+
// Cannot vectorize due to virtual columns.
cannotVectorize();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index e4a6ec0..5a78f0f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -35,6 +35,7 @@ import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
@@ -160,6 +161,7 @@ public class CalciteTests
public static final String SOME_DATASOURCE = "some_datasource";
public static final String SOME_DATSOURCE_ESCAPED = "some\\_datasource";
public static final String SOMEXDATASOURCE = "somexdatasource";
+ public static final String USERVISITDATASOURCE = "visits";
public static final String DRUID_SCHEMA_NAME = "druid";
public static final String INFORMATION_SCHEMA_NAME = "INFORMATION_SCHEMA";
public static final String SYSTEM_SCHEMA_NAME = "sys";
@@ -365,6 +367,15 @@ public class CalciteTests
.withRollup(false)
.build();
+ private static final List<String> USER_VISIT_DIMS = ImmutableList.of("user",
"country", "city");
+ private static final IncrementalIndexSchema INDEX_SCHEMA_USER_VISIT = new
IncrementalIndexSchema.Builder()
+ .withMetrics(
+ new CountAggregatorFactory("cnt")
+ )
+ .withRollup(false)
+ .withMinTimestamp(DateTimes.of("2020-12-31").getMillis())
+ .build();
+
public static final List<ImmutableMap<String, Object>> RAW_ROWS1 =
ImmutableList.of(
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-01")
@@ -647,6 +658,33 @@ public class CalciteTests
)
);
+ private static List<InputRow> USER_VISIT_ROWS = ImmutableList.of(
+ toRow(
+ "2021-01-01T01:00:00Z",
+ USER_VISIT_DIMS,
+ ImmutableMap.of("user", "alice", "country", "canada", "city", "A")
+ ),
+ toRow(
+ "2021-01-01T02:00:00Z",
+ USER_VISIT_DIMS,
+ ImmutableMap.of("user", "alice", "country", "canada", "city", "B")
+ ),
+ toRow("2021-01-01T03:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"bob", "country", "canada", "city", "A")),
+ toRow("2021-01-01T04:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"alice", "country", "India", "city", "Y")),
+ toRow(
+ "2021-01-02T01:00:00Z",
+ USER_VISIT_DIMS,
+ ImmutableMap.of("user", "alice", "country", "canada", "city", "A")
+ ),
+ toRow("2021-01-02T02:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"bob", "country", "canada", "city", "A")),
+ toRow("2021-01-02T03:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"foo", "country", "canada", "city", "B")),
+ toRow("2021-01-02T04:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"bar", "country", "canada", "city", "B")),
+ toRow("2021-01-02T05:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"alice", "country", "India", "city", "X")),
+ toRow("2021-01-02T06:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"bob", "country", "India", "city", "X")),
+ toRow("2021-01-02T07:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"foo", "country", "India", "city", "X")),
+ toRow("2021-01-03T01:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user",
"foo", "country", "USA", "city", "M"))
+ );
+
private static final InlineDataSource JOINABLE_BACKING_DATA =
InlineDataSource.fromIterable(
RAW_ROWS1_WITH_NUMERIC_DIMS.stream().map(x -> new Object[]{
x.get("dim1"),
@@ -856,6 +894,14 @@ public class CalciteTests
.rows(RAW_ROWS1_X)
.buildMMappedIndex();
+ final QueryableIndex userVisitIndex = IndexBuilder
+ .create()
+ .tmpDir(new File(tmpDir, "8"))
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(INDEX_SCHEMA)
+ .rows(USER_VISIT_ROWS)
+ .buildMMappedIndex();
+
return new SpecificSegmentsQuerySegmentWalker(
conglomerate,
@@ -943,9 +989,23 @@ public class CalciteTests
.size(0)
.build(),
indexNumericDims
+ ).add(
+ DataSegment.builder()
+ .dataSource(USERVISITDATASOURCE)
+ .interval(userVisitIndex.getDataInterval())
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build(),
+ userVisitIndex
);
}
+ private static MapBasedInputRow toRow(String time, List<String> dimensions,
Map<String, Object> event)
+ {
+ return new MapBasedInputRow(DateTimes.ISO_DATE_OPTIONAL_TIME.parse(time),
dimensions, event);
+ }
+
public static ExprMacroTable createExprMacroTable()
{
final List<ExprMacroTable.ExprMacro> exprMacros = new ArrayList<>();
diff --git a/website/.spelling b/website/.spelling
index 76a4a83..9a9d767 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1540,6 +1540,7 @@ druid.server.tier
druid.sql.planner.maxSemiJoinRowsInMemory
druid.sql.planner.sqlTimeZone
druid.sql.planner.useApproximateCountDistinct
+druid.sql.planner.useGroupingSetForExactDistinct
druid.sql.planner.useApproximateTopN
error_msg
exprs
@@ -1572,6 +1573,7 @@ timestamp_expr
tls_port
total_size
useApproximateCountDistinct
+useGroupingSetForExactDistinct
useApproximateTopN
wikipedia
- ../docs/querying/timeseriesquery.md
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]