gianm closed pull request #6221: SQL: Finalize aggregations for inner queries
when necessary.
URL: https://github.com/apache/incubator-druid/pull/6221
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
index 830771d4a95..af6473bac07 100644
---
a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
+++
b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
@@ -48,6 +48,7 @@
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
@@ -62,6 +63,7 @@ public SqlAggFunction calciteFunction()
return FUNCTION_INSTANCE;
}
+ @Nullable
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
@@ -70,7 +72,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
final DruidExpression input = Expressions.toDruidExpression(
diff --git
a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
index d2573e880ed..6d4ef48081f 100644
---
a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
+++
b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java
@@ -20,13 +20,13 @@
package io.druid.sql.calcite.aggregation;
import com.google.common.collect.ImmutableList;
-import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.VirtualColumn;
import io.druid.segment.column.ValueType;
import io.druid.sql.calcite.expression.DruidExpression;
+import io.druid.sql.calcite.planner.Calcites;
import javax.annotation.Nullable;
import java.util.List;
@@ -80,7 +80,7 @@ public DimensionSpec toDimensionSpec()
@Nullable
public String getVirtualColumnName()
{
- return expression.isSimpleExtraction() ? null : StringUtils.format("%s:v",
outputName);
+ return expression.isSimpleExtraction() ? null :
Calcites.makePrefixedName(outputName, "v");
}
@Override
diff --git
a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
index 83ac84ae5c8..4c8777d4af9 100644
--- a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
+++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java
@@ -53,6 +53,9 @@
* @param project project that should be applied before
aggregation; may be null
* @param existingAggregations existing aggregations for this query; useful
for re-using aggregations. May be safely
* ignored if you do not want to re-use existing
aggregations.
+ * @param finalizeAggregations true if this query should include explicit
finalization for all of its
+ * aggregators, where required. This is set for
subqueries where Druid's native query
+ * layer does not do this automatically.
*
* @return aggregation, or null if the call cannot be translated
*/
@@ -64,6 +67,7 @@ Aggregation toDruidAggregation(
String name,
AggregateCall aggregateCall,
Project project,
- List<Aggregation> existingAggregations
+ List<Aggregation> existingAggregations,
+ boolean finalizeAggregations
);
}
diff --git
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
index dfb1c37f477..208609aaac6 100644
---
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
+++
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java
@@ -22,9 +22,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.druid.java.util.common.ISE;
-import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import
io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
@@ -52,6 +52,7 @@
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class ApproxCountDistinctSqlAggregator implements SqlAggregator
@@ -74,7 +75,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't
let us use direct column access
@@ -92,14 +94,15 @@ public Aggregation toDruidAggregation(
final List<VirtualColumn> virtualColumns = new ArrayList<>();
final AggregatorFactory aggregatorFactory;
+ final String aggregatorName = finalizeAggregations ?
Calcites.makePrefixedName(name, "a") : name;
if (arg.isDirectColumnAccess() &&
rowSignature.getColumnType(arg.getDirectColumn()) == ValueType.COMPLEX) {
- aggregatorFactory = new HyperUniquesAggregatorFactory(name,
arg.getDirectColumn(), false, true);
+ aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName,
arg.getDirectColumn(), false, true);
} else {
final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
final ValueType inputType =
Calcites.getValueTypeForSqlTypeName(sqlTypeName);
if (inputType == null) {
- throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for
field[%s]", sqlTypeName, name);
+ throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for
field[%s]", sqlTypeName, aggregatorName);
}
final DimensionSpec dimensionSpec;
@@ -108,7 +111,7 @@ public Aggregation toDruidAggregation(
dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null,
inputType);
} else {
final ExpressionVirtualColumn virtualColumn = arg.toVirtualColumn(
- StringUtils.format("%s:v", name),
+ Calcites.makePrefixedName(name, "v"),
inputType,
plannerContext.getExprMacroTable()
);
@@ -116,10 +119,20 @@ public Aggregation toDruidAggregation(
virtualColumns.add(virtualColumn);
}
- aggregatorFactory = new CardinalityAggregatorFactory(name, null,
ImmutableList.of(dimensionSpec), false, true);
+ aggregatorFactory = new CardinalityAggregatorFactory(
+ aggregatorName,
+ null,
+ ImmutableList.of(dimensionSpec),
+ false,
+ true
+ );
}
- return Aggregation.create(virtualColumns, aggregatorFactory);
+ return Aggregation.create(
+ virtualColumns,
+ Collections.singletonList(aggregatorFactory),
+ finalizeAggregations ? new HyperUniqueFinalizingPostAggregator(name,
aggregatorFactory.getName()) : null
+ );
}
private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction
diff --git
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
index e490f0fb8ca..5d695bb2655 100644
---
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
+++
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
@@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import io.druid.java.util.common.StringUtils;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@@ -32,6 +31,7 @@
import io.druid.sql.calcite.aggregation.Aggregations;
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.expression.DruidExpression;
+import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.rel.core.AggregateCall;
@@ -61,7 +61,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
@@ -102,8 +103,8 @@ public Aggregation toDruidAggregation(
expression = arg.getExpression();
}
- final String sumName = StringUtils.format("%s:sum", name);
- final String countName = StringUtils.format("%s:count", name);
+ final String sumName = Calcites.makePrefixedName(name, "sum");
+ final String countName = Calcites.makePrefixedName(name, "count");
final AggregatorFactory sum = SumSqlAggregator.createSumAggregatorFactory(
sumType,
sumName,
diff --git
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
index 61aac896e15..8b47037c241 100644
---
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
+++
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
@@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
final List<DruidExpression> args =
Aggregations.getArgumentsForSimpleAggregator(
@@ -87,7 +88,8 @@ public Aggregation toDruidAggregation(
name,
aggregateCall,
project,
- existingAggregations
+ existingAggregations,
+ finalizeAggregations
);
} else {
return null;
diff --git
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
index 9dff97d6dd5..5e96057f4ad 100644
---
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
+++
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java
@@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
diff --git
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
index 0950cad1410..81a26d3a569 100644
---
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
+++
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java
@@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
diff --git
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
index f8076527aaf..61a36c8a7e3 100644
---
a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
+++
b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java
@@ -60,7 +60,8 @@ public Aggregation toDruidAggregation(
final String name,
final AggregateCall aggregateCall,
final Project project,
- final List<Aggregation> existingAggregations
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
if (aggregateCall.isDistinct()) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
index e59c554027a..df401b351ec 100644
--- a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
+++ b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java
@@ -339,21 +339,26 @@ public static boolean isIntLiteral(final RexNode rexNode)
return rexNode instanceof RexLiteral &&
SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
}
- public static String findOutputNamePrefix(final String basePrefix, final
NavigableSet<String> strings)
+ public static String findUnusedPrefix(final String basePrefix, final
NavigableSet<String> strings)
{
String prefix = basePrefix;
- while (!isUsablePrefix(strings, prefix)) {
+ while (!isUnusedPrefix(prefix, strings)) {
prefix = "_" + prefix;
}
return prefix;
}
- private static boolean isUsablePrefix(final NavigableSet<String> strings,
final String prefix)
+ private static boolean isUnusedPrefix(final String prefix, final
NavigableSet<String> strings)
{
// ":" is one character after "9"
final NavigableSet<String> subSet = strings.subSet(prefix + "0", true,
prefix + ":", false);
return subSet.isEmpty();
}
+
+ public static String makePrefixedName(final String prefix, final String
suffix)
+ {
+ return StringUtils.format("%s:%s", prefix, suffix);
+ }
}
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
index 6caab8d544f..7d0666cd3bf 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java
@@ -88,7 +88,11 @@ public PartialDruidQuery getPartialDruidQuery()
@Override
public Sequence<Object[]> runQuery()
{
- final DruidQuery query = toDruidQuery();
+ // runQuery doesn't need to finalize aggregations, because the fact that
runQuery is happening suggests this
+ // is the outermost query and it will actually get run as a native query.
Druid's native query layer will
+ // finalize aggregations for the outermost query even if we don't
explicitly ask it to.
+
+ final DruidQuery query = toDruidQuery(false);
if (query != null) {
return getQueryMaker().runQuery(query);
} else {
@@ -116,9 +120,11 @@ public int getQueryCount()
@Nullable
@Override
- public DruidQuery toDruidQuery()
+ public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
- final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery();
+ // Must finalize aggregations on subqueries.
+
+ final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true);
if (subQuery == null) {
return null;
}
@@ -128,7 +134,8 @@ public DruidQuery toDruidQuery()
new QueryDataSource(subQuery.toGroupByQuery()),
sourceRowSignature,
getPlannerContext(),
- getCluster().getRexBuilder()
+ getCluster().getRexBuilder(),
+ finalizeAggregations
);
}
@@ -142,7 +149,8 @@ public DruidQuery toDruidQueryForExplaining()
sourceRel.getRowType()
),
getPlannerContext(),
- getCluster().getRexBuilder()
+ getCluster().getRexBuilder(),
+ false
);
}
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
index 29801265ed0..e6d5ba5e767 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java
@@ -131,7 +131,8 @@ public DruidQuery(
final DataSource dataSource,
final RowSignature sourceRowSignature,
final PlannerContext plannerContext,
- final RexBuilder rexBuilder
+ final RexBuilder rexBuilder,
+ final boolean finalizeAggregations
)
{
this.dataSource = dataSource;
@@ -142,7 +143,7 @@ public DruidQuery(
// Now the fun begins.
this.filter = computeWhereFilter(partialQuery, sourceRowSignature,
plannerContext);
this.selectProjection = computeSelectProjection(partialQuery,
plannerContext, sourceRowSignature);
- this.grouping = computeGrouping(partialQuery, plannerContext,
sourceRowSignature, rexBuilder);
+ this.grouping = computeGrouping(partialQuery, plannerContext,
sourceRowSignature, rexBuilder, finalizeAggregations);
final RowSignature sortingInputRowSignature;
@@ -222,7 +223,7 @@ private static SelectProjection computeSelectProjection(
final List<VirtualColumn> virtualColumns = new ArrayList<>();
final List<String> rowOrder = new ArrayList<>();
- final String virtualColumnPrefix = Calcites.findOutputNamePrefix(
+ final String virtualColumnPrefix = Calcites.findUnusedPrefix(
"v",
new TreeSet<>(sourceRowSignature.getRowOrder())
);
@@ -254,7 +255,8 @@ private static Grouping computeGrouping(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final RowSignature sourceRowSignature,
- final RexBuilder rexBuilder
+ final RexBuilder rexBuilder,
+ final boolean finalizeAggregations
)
{
final Aggregate aggregate = partialQuery.getAggregate();
@@ -269,7 +271,8 @@ private static Grouping computeGrouping(
partialQuery,
plannerContext,
sourceRowSignature,
- rexBuilder
+ rexBuilder,
+ finalizeAggregations
);
final RowSignature aggregateRowSignature = RowSignature.from(
@@ -428,7 +431,7 @@ private static ProjectRowOrderAndPostAggregations
computePostAggregations(
{
final Aggregate aggregate =
Preconditions.checkNotNull(partialQuery.getAggregate());
final List<DimensionExpression> dimensions = new ArrayList<>();
- final String outputNamePrefix = Calcites.findOutputNamePrefix("d", new
TreeSet<>(sourceRowSignature.getRowOrder()));
+ final String outputNamePrefix = Calcites.findUnusedPrefix("d", new
TreeSet<>(sourceRowSignature.getRowOrder()));
int outputNameCounter = 0;
for (int i : aggregate.getGroupSet()) {
@@ -460,10 +463,13 @@ private static ProjectRowOrderAndPostAggregations
computePostAggregations(
/**
* Returns aggregations corresponding to {@code aggregate.getAggCallList()},
in the same order.
*
- * @param partialQuery partial query
- * @param plannerContext planner context
- * @param sourceRowSignature source row signature
- * @param rexBuilder calcite RexBuilder
+ * @param partialQuery partial query
+ * @param plannerContext planner context
+ * @param sourceRowSignature source row signature
+ * @param rexBuilder calcite RexBuilder
+ * @param finalizeAggregations true if this query should include explicit
finalization for all of its
+ * aggregators, where required. Useful for
subqueries where Druid's native query layer
+ * does not do this automatically.
*
* @return aggregations
*
@@ -473,12 +479,13 @@ private static ProjectRowOrderAndPostAggregations
computePostAggregations(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final RowSignature sourceRowSignature,
- final RexBuilder rexBuilder
+ final RexBuilder rexBuilder,
+ final boolean finalizeAggregations
)
{
final Aggregate aggregate =
Preconditions.checkNotNull(partialQuery.getAggregate());
final List<Aggregation> aggregations = new ArrayList<>();
- final String outputNamePrefix = Calcites.findOutputNamePrefix("a", new
TreeSet<>(sourceRowSignature.getRowOrder()));
+ final String outputNamePrefix = Calcites.findUnusedPrefix("a", new
TreeSet<>(sourceRowSignature.getRowOrder()));
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
final String aggName = outputNamePrefix + i;
@@ -490,7 +497,8 @@ private static ProjectRowOrderAndPostAggregations
computePostAggregations(
partialQuery.getSelectProject(),
aggCall,
aggregations,
- aggName
+ aggName,
+ finalizeAggregations
);
if (aggregation == null) {
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
index 0b1088a8ac2..57dea9b91ae 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java
@@ -93,20 +93,21 @@ public static DruidQueryRel fullScan(
@Override
@Nonnull
- public DruidQuery toDruidQuery()
+ public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
return partialQuery.build(
druidTable.getDataSource(),
druidTable.getRowSignature(),
getPlannerContext(),
- getCluster().getRexBuilder()
+ getCluster().getRexBuilder(),
+ finalizeAggregations
);
}
@Override
public DruidQuery toDruidQueryForExplaining()
{
- return toDruidQuery();
+ return toDruidQuery(false);
}
@Override
@@ -169,7 +170,11 @@ public int getQueryCount()
@Override
public Sequence<Object[]> runQuery()
{
- return getQueryMaker().runQuery(toDruidQuery());
+ // runQuery doesn't need to finalize aggregations, because the fact that
runQuery is happening suggests this
+ // is the outermost query and it will actually get run as a native query.
Druid's native query layer will
+ // finalize aggregations for the outermost query even if we don't
explicitly ask it to.
+
+ return getQueryMaker().runQuery(toDruidQuery(false));
}
@Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
index 7739c9b3e14..12053a75b99 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java
@@ -76,12 +76,16 @@ public boolean isValidDruidQuery()
*
* This method may return null if it knows that this rel will yield an empty
result set.
*
+ * @param finalizeAggregations true if this query should include explicit
finalization for all of its
+ * aggregators, where required. Useful for
subqueries where Druid's native query layer
+ * does not do this automatically.
+ *
* @return query, or null if it is known in advance that this rel will yield
an empty result set.
*
* @throws CannotBuildQueryException
*/
@Nullable
- public abstract DruidQuery toDruidQuery();
+ public abstract DruidQuery toDruidQuery(boolean finalizeAggregations);
/**
* Convert this DruidRel to a DruidQuery for purposes of explaining. This
must be an inexpensive operation. For
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
index 65fe55f9545..f54fd51c68e 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java
@@ -143,10 +143,10 @@ public DruidSemiJoin withPartialQuery(final
PartialDruidQuery newQueryBuilder)
@Nullable
@Override
- public DruidQuery toDruidQuery()
+ public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final DruidRel rel = getLeftRelWithFilter();
- return rel != null ? rel.toDruidQuery() : null;
+ return rel != null ? rel.toDruidQuery(finalizeAggregations) : null;
}
@Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
index 9483fa25f90..5adfacf69ec 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java
@@ -278,10 +278,11 @@ public DruidQuery build(
final DataSource dataSource,
final RowSignature sourceRowSignature,
final PlannerContext plannerContext,
- final RexBuilder rexBuilder
+ final RexBuilder rexBuilder,
+ final boolean finalizeAggregations
)
{
- return new DruidQuery(this, dataSource, sourceRowSignature,
plannerContext, rexBuilder);
+ return new DruidQuery(this, dataSource, sourceRowSignature,
plannerContext, rexBuilder, finalizeAggregations);
}
public boolean canAccept(final Stage stage)
diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
index 7c44c7bd210..642c8860c9d 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java
@@ -107,7 +107,7 @@ public DruidQueryRule(
{
super(
operand(relClass, operand(DruidRel.class, any())),
- StringUtils.format("%s:%s", DruidQueryRule.class.getSimpleName(),
stage)
+ StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(),
stage)
);
this.stage = stage;
this.f = f;
@@ -261,7 +261,7 @@ public void onMatch(RelOptRuleCall call)
public DruidOuterQueryRule(final RelOptRuleOperand op, final String
description)
{
- super(op, StringUtils.format("%s:%s",
DruidOuterQueryRel.class.getSimpleName(), description));
+ super(op, StringUtils.format("%s(%s)",
DruidOuterQueryRel.class.getSimpleName(), description));
}
@Override
diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
index 368bfac87b3..c42fba6e45b 100644
--- a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java
@@ -57,7 +57,8 @@ public static Aggregation translateAggregateCall(
final Project project,
final AggregateCall call,
final List<Aggregation> existingAggregations,
- final String name
+ final String name,
+ final boolean finalizeAggregations
)
{
final DimFilter filter;
@@ -125,7 +126,8 @@ public static Aggregation translateAggregateCall(
name,
call,
project,
- existingAggregationsWithSameFilter
+ existingAggregationsWithSameFilter,
+ finalizeAggregations
);
if (retVal == null) {
diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
index 7a703412fec..d8b01567ba6 100644
--- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java
@@ -50,6 +50,7 @@
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
+import
io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ExpressionPostAggregator;
@@ -2560,8 +2561,10 @@ public void
testGroupByWithSortOnPostAggregationNoTopNConfig() throws Exception
.setInterval(QSS(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(DIMS(new DefaultDimensionSpec("dim1",
"d0")))
- .setAggregatorSpecs(new
FloatMinAggregatorFactory("a0", "m1"),
- new
FloatMaxAggregatorFactory("a1", "m1"))
+ .setAggregatorSpecs(
+ new FloatMinAggregatorFactory("a0", "m1"),
+ new FloatMaxAggregatorFactory("a1", "m1")
+ )
.setPostAggregatorSpecs(ImmutableList.of(EXPRESSION_POST_AGG("p0", "(\"a0\" +
\"a1\")")))
.setLimitSpec(
new DefaultLimitSpec(
@@ -2602,8 +2605,10 @@ public void
testGroupByWithSortOnPostAggregationNoTopNContext() throws Exception
.setInterval(QSS(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(DIMS(new DefaultDimensionSpec("dim1",
"d0")))
- .setAggregatorSpecs(new
FloatMinAggregatorFactory("a0", "m1"),
- new
FloatMaxAggregatorFactory("a1", "m1"))
+ .setAggregatorSpecs(
+ new FloatMinAggregatorFactory("a0", "m1"),
+ new FloatMaxAggregatorFactory("a1", "m1")
+ )
.setPostAggregatorSpecs(
ImmutableList.of(
EXPRESSION_POST_AGG("p0", "(\"a0\" + \"a1\")")
@@ -4384,6 +4389,74 @@ public void testExactCountDistinctUsingSubquery() throws
Exception
);
}
+ @Test
+ public void testAvgDailyCountDistinct() throws Exception
+ {
+ testQuery(
+ "SELECT\n"
+ + " AVG(u)\n"
+ + "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT(cnt) AS u
FROM druid.foo GROUP BY 1)",
+ ImmutableList.of(
+ GroupByQuery.builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+
.setDataSource(CalciteTests.DATASOURCE1)
+
.setInterval(QSS(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setVirtualColumns(
+ EXPRESSION_VIRTUAL_COLUMN(
+ "d0:v",
+
"timestamp_floor(\"__time\",'P1D',null,'UTC')",
+ ValueType.LONG
+ )
+ )
+ .setDimensions(DIMS(new
DefaultDimensionSpec("d0:v", "d0", ValueType.LONG)))
+ .setAggregatorSpecs(
+ AGGS(
+ new
CardinalityAggregatorFactory(
+ "a0:a",
+ null,
+ DIMS(new
DefaultDimensionSpec("cnt", "cnt", ValueType.LONG)),
+ false,
+ true
+ )
+ )
+ )
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new
HyperUniqueFinalizingPostAggregator("a0", "a0:a")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ )
+ )
+ .setInterval(QSS(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(AGGS(
+ new LongSumAggregatorFactory("_a0:sum", "a0"),
+ new CountAggregatorFactory("_a0:count")
+ ))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "_a0",
+ "quotient",
+ ImmutableList.of(
+ new FieldAccessPostAggregator(null,
"_a0:sum"),
+ new FieldAccessPostAggregator(null,
"_a0:count")
+ )
+ )
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{1L})
+ );
+ }
+
@Test
public void testTopNFilterJoin() throws Exception
{
@@ -6897,7 +6970,10 @@ public void testProjectAfterSort2() throws Exception
.setAggregatorSpecs(
AGGS(new CountAggregatorFactory("a0"), new
DoubleSumAggregatorFactory("a1", "m2"))
)
-
.setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG("p0",
"(\"a1\" / \"a0\")")))
+
.setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG(
+ "p0",
+ "(\"a1\" / \"a0\")"
+ )))
.setLimitSpec(
new DefaultLimitSpec(
Collections.singletonList(
diff --git a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
index beb503ddaa4..9542b03062d 100644
--- a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
+++ b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java
@@ -40,16 +40,16 @@ public void testEscapeStringLiteral()
}
@Test
- public void testFindOutputNamePrefix()
+ public void testFindUnusedPrefix()
{
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "bar")));
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "bar", "x")));
- Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "bar", "x0")));
- Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "bar", "x4")));
- Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "_xbxx")));
- Assert.assertEquals("x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "xa", "_x")));
- Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x",
ImmutableSortedSet.of("foo", "x1a", "_x90")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "bar")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "bar", "x")));
+ Assert.assertEquals("_x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "bar", "x0")));
+ Assert.assertEquals("_x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "bar", "x4")));
+ Assert.assertEquals("__x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "_xbxx")));
+ Assert.assertEquals("x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "xa", "_x")));
+ Assert.assertEquals("__x", Calcites.findUnusedPrefix("x",
ImmutableSortedSet.of("foo", "x1a", "_x90")));
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]