This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b5a87fd89b Support constant args in window functions (#15071)
b5a87fd89b is described below
commit b5a87fd89bc8480702c90cc1b678d20f01c66426
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Sun Oct 8 08:44:25 2023 +0200
Support constant args in window functions (#15071)
Instead of passing the constants around in a new parameter; InputAccessor
was introduced to take care of transparently handling the constants - this new
class started picking up some copy-paste debris around field accesses; and made
them a little bit more readble.
---
.../CompressedBigDecimalSqlAggregatorBase.java | 38 ++------
.../sql/TDigestGenerateSketchSqlAggregator.java | 25 +----
.../sql/TDigestSketchQuantileSqlAggregator.java | 32 ++----
.../hll/sql/HllSketchBaseSqlAggregator.java | 38 ++------
.../DoublesSketchApproxQuantileSqlAggregator.java | 32 ++----
.../sql/DoublesSketchObjectSqlAggregator.java | 25 +----
.../theta/sql/ThetaSketchBaseSqlAggregator.java | 31 ++----
.../sql/ArrayOfDoublesSketchSqlAggregator.java | 31 ++----
.../bloom/sql/BloomFilterSqlAggregator.java | 24 +----
...FixedBucketsHistogramQuantileSqlAggregator.java | 53 ++--------
.../histogram/sql/QuantileSqlAggregator.java | 37 ++-----
.../variance/sql/BaseVarianceSqlAggregator.java | 19 +---
.../sql/calcite/aggregation/Aggregations.java | 15 +--
.../ApproxCountDistinctSqlAggregator.java | 12 +--
.../sql/calcite/aggregation/SqlAggregator.java | 46 ++++++++-
.../builtin/ArrayConcatSqlAggregator.java | 17 +---
.../aggregation/builtin/ArraySqlAggregator.java | 18 +---
.../aggregation/builtin/AvgSqlAggregator.java | 27 ++----
.../aggregation/builtin/BitwiseSqlAggregator.java | 12 +--
.../BuiltinApproxCountDistinctSqlAggregator.java | 23 ++---
.../aggregation/builtin/CountSqlAggregator.java | 30 ++----
.../builtin/EarliestLatestAnySqlAggregator.java | 20 ++--
.../builtin/EarliestLatestBySqlAggregator.java | 17 +---
.../aggregation/builtin/GroupingSqlAggregator.java | 16 ++-
.../aggregation/builtin/LiteralSqlAggregator.java | 10 +-
.../aggregation/builtin/SimpleSqlAggregator.java | 12 +--
.../aggregation/builtin/StringSqlAggregator.java | 26 ++---
.../sql/calcite/expression/WindowSqlAggregate.java | 8 +-
.../apache/druid/sql/calcite/rel/DruidQuery.java | 6 +-
.../druid/sql/calcite/rel/InputAccessor.java | 108 +++++++++++++++++++++
.../apache/druid/sql/calcite/rel/Windowing.java | 6 +-
.../druid/sql/calcite/rule/GroupByRules.java | 14 +--
.../calcite/tests/window/aggregateConstant.sqlTest | 26 +++++
33 files changed, 357 insertions(+), 497 deletions(-)
diff --git
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalSqlAggregatorBase.java
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalSqlAggregatorBase.java
index 4a61f0271e..a6c2355159 100644
---
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalSqlAggregatorBase.java
+++
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalSqlAggregatorBase.java
@@ -21,8 +21,6 @@ package org.apache.druid.compressedbigdecimal;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -36,12 +34,12 @@ import org.apache.calcite.util.Optionality;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -71,12 +69,10 @@ public abstract class CompressedBigDecimalSqlAggregatorBase
implements SqlAggreg
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
@@ -88,13 +84,8 @@ public abstract class CompressedBigDecimalSqlAggregatorBase
implements SqlAggreg
// fetch sum column expression
DruidExpression sumColumn = Expressions.toDruidExpression(
plannerContext,
- rowSignature,
- Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- )
+ inputAccessor.getInputRowSignature(),
+ inputAccessor.getField(aggregateCall.getArgList().get(0))
);
if (sumColumn == null) {
@@ -114,12 +105,7 @@ public abstract class
CompressedBigDecimalSqlAggregatorBase implements SqlAggreg
Integer size = null;
if (aggregateCall.getArgList().size() >= 2) {
- RexNode sizeArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ RexNode sizeArg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
size = ((Number) RexLiteral.value(sizeArg)).intValue();
}
@@ -128,12 +114,7 @@ public abstract class
CompressedBigDecimalSqlAggregatorBase implements SqlAggreg
Integer scale = null;
if (aggregateCall.getArgList().size() >= 3) {
- RexNode scaleArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(2)
- );
+ RexNode scaleArg =
inputAccessor.getField(aggregateCall.getArgList().get(2));
scale = ((Number) RexLiteral.value(scaleArg)).intValue();
}
@@ -141,12 +122,7 @@ public abstract class
CompressedBigDecimalSqlAggregatorBase implements SqlAggreg
Boolean useStrictNumberParsing = null;
if (aggregateCall.getArgList().size() >= 4) {
- RexNode useStrictNumberParsingArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(3)
- );
+ RexNode useStrictNumberParsingArg =
inputAccessor.getField(aggregateCall.getArgList().get(3));
useStrictNumberParsing =
RexLiteral.booleanValue(useStrictNumberParsingArg);
}
diff --git
a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java
b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java
index ca0a4acc60..ebb6c7f4b1 100644
---
a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java
+++
b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestGenerateSketchSqlAggregator.java
@@ -20,8 +20,6 @@
package org.apache.druid.query.aggregation.tdigestsketch.sql;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -36,13 +34,12 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import
org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory;
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchUtils;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -63,25 +60,18 @@ public class TDigestGenerateSketchSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
- final RexNode inputOperand = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- );
+ final RexNode inputOperand =
inputAccessor.getField(aggregateCall.getArgList().get(0));
final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
- rowSignature,
+ inputAccessor.getInputRowSignature(),
inputOperand
);
if (input == null) {
@@ -93,12 +83,7 @@ public class TDigestGenerateSketchSqlAggregator implements
SqlAggregator
Integer compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION;
if (aggregateCall.getArgList().size() > 1) {
- RexNode compressionOperand = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ RexNode compressionOperand =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!compressionOperand.isA(SqlKind.LITERAL)) {
// compressionOperand must be a literal in order to plan.
return null;
diff --git
a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java
b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java
index 379e889d38..ee63444f6d 100644
---
a/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java
+++
b/extensions-contrib/tdigestsketch/src/main/java/org/apache/druid/query/aggregation/tdigestsketch/sql/TDigestSketchQuantileSqlAggregator.java
@@ -21,8 +21,6 @@ package org.apache.druid.query.aggregation.tdigestsketch.sql;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -39,13 +37,12 @@ import
org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorF
import
org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator;
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchUtils;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -66,12 +63,10 @@ public class TDigestSketchQuantileSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
@@ -79,13 +74,8 @@ public class TDigestSketchQuantileSqlAggregator implements
SqlAggregator
// This is expected to be a tdigest sketch
final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
- rowSignature,
- Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- )
+ inputAccessor.getInputRowSignature(),
+ inputAccessor.getField(aggregateCall.getArgList().get(0))
);
if (input == null) {
return null;
@@ -95,12 +85,7 @@ public class TDigestSketchQuantileSqlAggregator implements
SqlAggregator
final String sketchName = StringUtils.format("%s:agg", name);
// this is expected to be quantile fraction
- final RexNode quantileArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ final RexNode quantileArg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!quantileArg.isA(SqlKind.LITERAL)) {
// Quantile must be a literal in order to plan.
@@ -110,12 +95,7 @@ public class TDigestSketchQuantileSqlAggregator implements
SqlAggregator
final double quantile = ((Number)
RexLiteral.value(quantileArg)).floatValue();
Integer compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION;
if (aggregateCall.getArgList().size() > 2) {
- final RexNode compressionArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(2)
- );
+ final RexNode compressionArg =
inputAccessor.getField(aggregateCall.getArgList().get(2));
compression = ((Number) RexLiteral.value(compressionArg)).intValue();
}
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java
index c6dd3e7afa..d221b72ac1 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java
@@ -20,9 +20,7 @@
package org.apache.druid.query.aggregation.datasketches.hll.sql;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
@@ -36,7 +34,6 @@ import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggrega
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
@@ -44,6 +41,7 @@ import
org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -66,38 +64,26 @@ public abstract class HllSketchBaseSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't
let us use direct column access
// for string columns.
- final RexNode columnRexNode = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- );
+ final RexNode columnRexNode =
inputAccessor.getField(aggregateCall.getArgList().get(0));
- final DruidExpression columnArg =
Expressions.toDruidExpression(plannerContext, rowSignature, columnRexNode);
+ final DruidExpression columnArg =
Expressions.toDruidExpression(plannerContext,
inputAccessor.getInputRowSignature(), columnRexNode);
if (columnArg == null) {
return null;
}
final int logK;
if (aggregateCall.getArgList().size() >= 2) {
- final RexNode logKarg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ final RexNode logKarg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!logKarg.isA(SqlKind.LITERAL)) {
// logK must be a literal in order to plan.
@@ -111,12 +97,7 @@ public abstract class HllSketchBaseSqlAggregator implements
SqlAggregator
final String tgtHllType;
if (aggregateCall.getArgList().size() >= 3) {
- final RexNode tgtHllTypeArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(2)
- );
+ final RexNode tgtHllTypeArg =
inputAccessor.getField(aggregateCall.getArgList().get(2));
if (!tgtHllTypeArg.isA(SqlKind.LITERAL)) {
// tgtHllType must be a literal in order to plan.
@@ -132,9 +113,10 @@ public abstract class HllSketchBaseSqlAggregator
implements SqlAggregator
final String aggregatorName = finalizeAggregations ?
Calcites.makePrefixedName(name, "a") : name;
if (columnArg.isDirectColumnAccess()
- && rowSignature.getColumnType(columnArg.getDirectColumn())
- .map(type -> type.is(ValueType.COMPLEX))
- .orElse(false)) {
+ && inputAccessor.getInputRowSignature()
+ .getColumnType(columnArg.getDirectColumn())
+ .map(type -> type.is(ValueType.COMPLEX))
+ .orElse(false)) {
aggregatorFactory = new HllSketchMergeAggregatorFactory(
aggregatorName,
columnArg.getDirectColumn(),
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java
index 6c1b5720af..08c7a1b123 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java
@@ -21,8 +21,6 @@ package
org.apache.druid.query.aggregation.datasketches.quantiles.sql;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -37,14 +35,13 @@ import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAg
import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToQuantilePostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -75,25 +72,18 @@ public class DoublesSketchApproxQuantileSqlAggregator
implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
- rowSignature,
- Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- )
+ inputAccessor.getInputRowSignature(),
+ inputAccessor.getField(aggregateCall.getArgList().get(0))
);
if (input == null) {
return null;
@@ -101,12 +91,7 @@ public class DoublesSketchApproxQuantileSqlAggregator
implements SqlAggregator
final AggregatorFactory aggregatorFactory;
final String histogramName = StringUtils.format("%s:agg", name);
- final RexNode probabilityArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ final RexNode probabilityArg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!probabilityArg.isA(SqlKind.LITERAL)) {
// Probability must be a literal in order to plan.
@@ -117,12 +102,7 @@ public class DoublesSketchApproxQuantileSqlAggregator
implements SqlAggregator
final int k;
if (aggregateCall.getArgList().size() >= 3) {
- final RexNode resolutionArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(2)
- );
+ final RexNode resolutionArg =
inputAccessor.getField(aggregateCall.getArgList().get(2));
if (!resolutionArg.isA(SqlKind.LITERAL)) {
// Resolution must be a literal in order to plan.
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java
index 049e1284a9..8331ab7206 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java
@@ -21,8 +21,6 @@ package
org.apache.druid.query.aggregation.datasketches.quantiles.sql;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -35,14 +33,13 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -71,25 +68,18 @@ public class DoublesSketchObjectSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
- rowSignature,
- Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- )
+ inputAccessor.getInputRowSignature(),
+ inputAccessor.getField(aggregateCall.getArgList().get(0))
);
if (input == null) {
return null;
@@ -100,12 +90,7 @@ public class DoublesSketchObjectSqlAggregator implements
SqlAggregator
final int k;
if (aggregateCall.getArgList().size() >= 2) {
- final RexNode resolutionArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ final RexNode resolutionArg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!resolutionArg.isA(SqlKind.LITERAL)) {
// Resolution must be a literal in order to plan.
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java
index 6564b276c9..bf35cd665a 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java
@@ -20,9 +20,7 @@
package org.apache.druid.query.aggregation.datasketches.theta.sql;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
@@ -34,7 +32,6 @@ import
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregat
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
@@ -42,6 +39,7 @@ import
org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -60,38 +58,26 @@ public abstract class ThetaSketchBaseSqlAggregator
implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't
let us use direct column access
// for string columns.
- final RexNode columnRexNode = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- );
+ final RexNode columnRexNode =
inputAccessor.getField(aggregateCall.getArgList().get(0));
- final DruidExpression columnArg =
Expressions.toDruidExpression(plannerContext, rowSignature, columnRexNode);
+ final DruidExpression columnArg =
Expressions.toDruidExpression(plannerContext,
inputAccessor.getInputRowSignature(), columnRexNode);
if (columnArg == null) {
return null;
}
final int sketchSize;
if (aggregateCall.getArgList().size() >= 2) {
- final RexNode sketchSizeArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ final RexNode sketchSizeArg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!sketchSizeArg.isA(SqlKind.LITERAL)) {
// logK must be a literal in order to plan.
@@ -107,9 +93,10 @@ public abstract class ThetaSketchBaseSqlAggregator
implements SqlAggregator
final String aggregatorName = finalizeAggregations ?
Calcites.makePrefixedName(name, "a") : name;
if (columnArg.isDirectColumnAccess()
- && rowSignature.getColumnType(columnArg.getDirectColumn())
- .map(type -> type.is(ValueType.COMPLEX))
- .orElse(false)) {
+ && inputAccessor.getInputRowSignature()
+ .getColumnType(columnArg.getDirectColumn())
+ .map(type -> type.is(ValueType.COMPLEX))
+ .orElse(false)) {
aggregatorFactory = new SketchMergeAggregatorFactory(
aggregatorName,
columnArg.getDirectColumn(),
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/sql/ArrayOfDoublesSketchSqlAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/sql/ArrayOfDoublesSketchSqlAggregator.java
index 9d6ddac89a..a9b1aaa627 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/sql/ArrayOfDoublesSketchSqlAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/sql/ArrayOfDoublesSketchSqlAggregator.java
@@ -20,9 +20,7 @@
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -38,7 +36,6 @@ import
org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketc
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
@@ -46,6 +43,7 @@ import
org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -69,12 +67,10 @@ public class ArrayOfDoublesSketchSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
@@ -86,12 +82,7 @@ public class ArrayOfDoublesSketchSqlAggregator implements
SqlAggregator
final int nominalEntries;
final int metricExpressionEndIndex;
final int lastArgIndex = argList.size() - 1;
- final RexNode potentialNominalEntriesArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- argList.get(lastArgIndex)
- );
+ final RexNode potentialNominalEntriesArg =
inputAccessor.getField(argList.get(lastArgIndex));
if (potentialNominalEntriesArg.isA(SqlKind.LITERAL) &&
RexLiteral.value(potentialNominalEntriesArg) instanceof Number) {
@@ -107,16 +98,11 @@ public class ArrayOfDoublesSketchSqlAggregator implements
SqlAggregator
for (int i = 0; i <= metricExpressionEndIndex; i++) {
final String fieldName;
- final RexNode columnRexNode = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- argList.get(i)
- );
+ final RexNode columnRexNode = inputAccessor.getField(argList.get(i));
final DruidExpression columnArg = Expressions.toDruidExpression(
plannerContext,
- rowSignature,
+ inputAccessor.getInputRowSignature(),
columnRexNode
);
if (columnArg == null) {
@@ -124,9 +110,10 @@ public class ArrayOfDoublesSketchSqlAggregator implements
SqlAggregator
}
if (columnArg.isDirectColumnAccess() &&
- rowSignature.getColumnType(columnArg.getDirectColumn())
- .map(type -> type.is(ValueType.COMPLEX))
- .orElse(false)) {
+ inputAccessor.getInputRowSignature()
+ .getColumnType(columnArg.getDirectColumn())
+ .map(type -> type.is(ValueType.COMPLEX))
+ .orElse(false)) {
fieldName = columnArg.getDirectColumn();
} else {
final RelDataType dataType = columnRexNode.getType();
diff --git
a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
index 0ec265595e..6a1ca49067 100644
---
a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
+++
b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregator.java
@@ -20,8 +20,6 @@
package org.apache.druid.query.aggregation.bloom.sql;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -38,13 +36,13 @@ import
org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -65,25 +63,18 @@ public class BloomFilterSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
- final RexNode inputOperand = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- );
+ final RexNode inputOperand =
inputAccessor.getField(aggregateCall.getArgList().get(0));
final DruidExpression input = Expressions.toDruidExpression(
plannerContext,
- rowSignature,
+ inputAccessor.getInputRowSignature(),
inputOperand
);
if (input == null) {
@@ -92,12 +83,7 @@ public class BloomFilterSqlAggregator implements
SqlAggregator
final AggregatorFactory aggregatorFactory;
final String aggName = StringUtils.format("%s:agg", name);
- final RexNode maxNumEntriesOperand = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ final RexNode maxNumEntriesOperand =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!maxNumEntriesOperand.isA(SqlKind.LITERAL)) {
// maxNumEntriesOperand must be a literal in order to plan.
diff --git
a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregator.java
b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregator.java
index 3f0bd14f84..fdc61796c4 100644
---
a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregator.java
+++
b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregator.java
@@ -21,8 +21,6 @@ package org.apache.druid.query.aggregation.histogram.sql;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -38,13 +36,12 @@ import
org.apache.druid.query.aggregation.histogram.FixedBucketsHistogram;
import
org.apache.druid.query.aggregation.histogram.FixedBucketsHistogramAggregatorFactory;
import org.apache.druid.query.aggregation.histogram.QuantilePostAggregator;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -65,25 +62,18 @@ public class FixedBucketsHistogramQuantileSqlAggregator
implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
- rowSignature,
- Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- )
+ inputAccessor.getInputRowSignature(),
+ inputAccessor.getField(aggregateCall.getArgList().get(0))
);
if (input == null) {
return null;
@@ -91,12 +81,7 @@ public class FixedBucketsHistogramQuantileSqlAggregator
implements SqlAggregator
final AggregatorFactory aggregatorFactory;
final String histogramName = StringUtils.format("%s:agg", name);
- final RexNode probabilityArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ final RexNode probabilityArg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!probabilityArg.isA(SqlKind.LITERAL)) {
// Probability must be a literal in order to plan.
@@ -107,12 +92,7 @@ public class FixedBucketsHistogramQuantileSqlAggregator
implements SqlAggregator
final int numBuckets;
if (aggregateCall.getArgList().size() >= 3) {
- final RexNode numBucketsArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(2)
- );
+ final RexNode numBucketsArg =
inputAccessor.getField(aggregateCall.getArgList().get(2));
if (!numBucketsArg.isA(SqlKind.LITERAL)) {
// Resolution must be a literal in order to plan.
@@ -126,12 +106,7 @@ public class FixedBucketsHistogramQuantileSqlAggregator
implements SqlAggregator
final double lowerLimit;
if (aggregateCall.getArgList().size() >= 4) {
- final RexNode lowerLimitArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(3)
- );
+ final RexNode lowerLimitArg =
inputAccessor.getField(aggregateCall.getArgList().get(3));
if (!lowerLimitArg.isA(SqlKind.LITERAL)) {
// Resolution must be a literal in order to plan.
@@ -145,12 +120,7 @@ public class FixedBucketsHistogramQuantileSqlAggregator
implements SqlAggregator
final double upperLimit;
if (aggregateCall.getArgList().size() >= 5) {
- final RexNode upperLimitArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(4)
- );
+ final RexNode upperLimitArg =
inputAccessor.getField(aggregateCall.getArgList().get(4));
if (!upperLimitArg.isA(SqlKind.LITERAL)) {
// Resolution must be a literal in order to plan.
@@ -164,12 +134,7 @@ public class FixedBucketsHistogramQuantileSqlAggregator
implements SqlAggregator
final FixedBucketsHistogram.OutlierHandlingMode outlierHandlingMode;
if (aggregateCall.getArgList().size() >= 6) {
- final RexNode outlierHandlingModeArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(5)
- );
+ final RexNode outlierHandlingModeArg =
inputAccessor.getField(aggregateCall.getArgList().get(5));
if (!outlierHandlingModeArg.isA(SqlKind.LITERAL)) {
// Resolution must be a literal in order to plan.
diff --git
a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
index 9ba7604d70..a3fe8dc545 100644
---
a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
+++
b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java
@@ -21,8 +21,6 @@ package org.apache.druid.query.aggregation.histogram.sql;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -39,14 +37,13 @@ import
org.apache.druid.query.aggregation.histogram.ApproximateHistogramAggregat
import
org.apache.druid.query.aggregation.histogram.ApproximateHistogramFoldingAggregatorFactory;
import org.apache.druid.query.aggregation.histogram.QuantilePostAggregator;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -67,25 +64,18 @@ public class QuantileSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
- rowSignature,
- Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- )
+ inputAccessor.getInputRowSignature(),
+ inputAccessor.getField(aggregateCall.getArgList().get(0))
);
if (input == null) {
return null;
@@ -93,12 +83,7 @@ public class QuantileSqlAggregator implements SqlAggregator
final AggregatorFactory aggregatorFactory;
final String histogramName = StringUtils.format("%s:agg", name);
- final RexNode probabilityArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ final RexNode probabilityArg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!probabilityArg.isA(SqlKind.LITERAL)) {
// Probability must be a literal in order to plan.
@@ -109,12 +94,7 @@ public class QuantileSqlAggregator implements SqlAggregator
final int resolution;
if (aggregateCall.getArgList().size() >= 3) {
- final RexNode resolutionArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(2)
- );
+ final RexNode resolutionArg =
inputAccessor.getField(aggregateCall.getArgList().get(2));
if (!resolutionArg.isA(SqlKind.LITERAL)) {
// Resolution must be a literal in order to plan.
@@ -170,7 +150,10 @@ public class QuantileSqlAggregator implements SqlAggregator
// No existing match found. Create a new one.
if (input.isDirectColumnAccess()) {
- if (rowSignature.getColumnType(input.getDirectColumn()).map(type ->
type.is(ValueType.COMPLEX)).orElse(false)) {
+ if (inputAccessor.getInputRowSignature()
+ .getColumnType(input.getDirectColumn())
+ .map(type -> type.is(ValueType.COMPLEX))
+ .orElse(false)) {
aggregatorFactory = new ApproximateHistogramFoldingAggregatorFactory(
histogramName,
input.getDirectColumn(),
diff --git
a/extensions-core/stats/src/main/java/org/apache/druid/query/aggregation/variance/sql/BaseVarianceSqlAggregator.java
b/extensions-core/stats/src/main/java/org/apache/druid/query/aggregation/variance/sql/BaseVarianceSqlAggregator.java
index ee8c469c3b..b2ed565d62 100644
---
a/extensions-core/stats/src/main/java/org/apache/druid/query/aggregation/variance/sql/BaseVarianceSqlAggregator.java
+++
b/extensions-core/stats/src/main/java/org/apache/druid/query/aggregation/variance/sql/BaseVarianceSqlAggregator.java
@@ -21,9 +21,7 @@ package org.apache.druid.query.aggregation.variance.sql;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
@@ -40,15 +38,14 @@ import
org.apache.druid.query.aggregation.variance.VarianceAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignatures;
@@ -77,25 +74,19 @@ public abstract class BaseVarianceSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
- final RexNode inputOperand = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(0)
- );
+ final RexNode inputOperand =
inputAccessor.getField(aggregateCall.getArgList().get(0));
+
final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
- rowSignature,
+ inputAccessor.getInputRowSignature(),
inputOperand
);
if (input == null) {
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java
index 3a3e43dd7b..5c06332a9b 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java
@@ -20,14 +20,13 @@
package org.apache.druid.sql.calcite.aggregation;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import javax.annotation.Nullable;
import java.util.List;
@@ -48,28 +47,24 @@ public class Aggregations
*
* 1) They can take direct field accesses or expressions as inputs.
* 2) They cannot implicitly cast strings to numbers when using a direct
field access.
- *
* @param plannerContext SQL planner context
- * @param rowSignature input row signature
* @param call aggregate call object
- * @param project project that should be applied before aggregation;
may be null
+ * @param inputAccessor gives access to input fields and schema
*
* @return list of expressions corresponding to aggregator arguments, or
null if any cannot be translated
*/
@Nullable
public static List<DruidExpression> getArgumentsForSimpleAggregator(
- final RexBuilder rexBuilder,
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final AggregateCall call,
- @Nullable final Project project
+ final InputAccessor inputAccessor
)
{
final List<DruidExpression> args = call
.getArgList()
.stream()
- .map(i -> Expressions.fromFieldAccess(rexBuilder.getTypeFactory(),
rowSignature, project, i))
- .map(rexNode -> toDruidExpressionForNumericAggregator(plannerContext,
rowSignature, rexNode))
+ .map(i -> inputAccessor.getField(i))
+ .map(rexNode -> toDruidExpressionForNumericAggregator(plannerContext,
inputAccessor.getInputRowSignature(), rexNode))
.collect(Collectors.toList());
if (args.stream().noneMatch(Objects::isNull)) {
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java
index 0ff7972657..eceb4ebbf8 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java
@@ -20,8 +20,6 @@
package org.apache.druid.sql.calcite.aggregation;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
@@ -30,8 +28,8 @@ import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Optionality;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -66,24 +64,20 @@ public class ApproxCountDistinctSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
return delegate.toDruidAggregation(
plannerContext,
- rowSignature,
virtualColumnRegistry,
- rexBuilder,
name,
aggregateCall,
- project,
+ inputAccessor,
existingAggregations,
finalizeAggregations
);
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/SqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/SqlAggregator.java
index d21f6ebb75..ec494a2fec 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/SqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/SqlAggregator.java
@@ -25,6 +25,7 @@ import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -42,6 +43,44 @@ public interface SqlAggregator
*/
SqlAggFunction calciteFunction();
+ /**
+ * Returns a Druid Aggregation corresponding to a SQL {@link AggregateCall}.
This method should ignore filters;
+ * they will be applied to your aggregator in a later step.
+ *
+ * @param plannerContext SQL planner context
+ * @param virtualColumnRegistry re-usable virtual column references
+ * @param name desired output name of the aggregation
+ * @param aggregateCall aggregate call object
+ * @param inputAccessor gives access to input fields and schema
+ * @param existingAggregations existing aggregations for this query; useful
for re-using aggregations. May be safely
+ * ignored if you do not want to re-use
existing aggregations.
+ * @param finalizeAggregations true if this query should include explicit
finalization for all of its
+ * aggregators, where required. This is set for
subqueries where Druid's native query
+ * layer does not do this automatically.
+ * @return aggregation, or null if the call cannot be translated
+ */
+ @Nullable
+ default Aggregation toDruidAggregation(
+ PlannerContext plannerContext,
+ VirtualColumnRegistry virtualColumnRegistry,
+ String name,
+ AggregateCall aggregateCall,
+ InputAccessor inputAccessor,
+ List<Aggregation> existingAggregations,
+ boolean finalizeAggregations
+ )
+ {
+ return toDruidAggregation(plannerContext,
+ inputAccessor.getInputRowSignature(),
+ virtualColumnRegistry,
+ inputAccessor.getRexBuilder(),
+ name,
+ aggregateCall,
+ inputAccessor.getProject(),
+ existingAggregations,
+ finalizeAggregations);
+ }
+
/**
* Returns a Druid Aggregation corresponding to a SQL {@link AggregateCall}.
This method should ignore filters;
* they will be applied to your aggregator in a later step.
@@ -62,7 +101,7 @@ public interface SqlAggregator
* @return aggregation, or null if the call cannot be translated
*/
@Nullable
- Aggregation toDruidAggregation(
+ default Aggregation toDruidAggregation(
PlannerContext plannerContext,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
@@ -72,5 +111,8 @@ public interface SqlAggregator
Project project,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
- );
+ )
+ {
+ throw new RuntimeException("unimplemented fallback method!");
+ }
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ArrayConcatSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ArrayConcatSqlAggregator.java
index ed6652181e..be21701d1e 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ArrayConcatSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ArrayConcatSqlAggregator.java
@@ -21,8 +21,6 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -39,18 +37,17 @@ import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
import java.util.List;
-import java.util.stream.Collectors;
public class ArrayConcatSqlAggregator implements SqlAggregator
{
@@ -67,21 +64,15 @@ public class ArrayConcatSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
- final List<RexNode> arguments = aggregateCall
- .getArgList()
- .stream()
- .map(i -> Expressions.fromFieldAccess(rexBuilder.getTypeFactory(),
rowSignature, project, i))
- .collect(Collectors.toList());
+ final List<RexNode> arguments =
inputAccessor.getFields(aggregateCall.getArgList());
Integer maxSizeBytes = null;
if (arguments.size() > 1) {
@@ -92,7 +83,7 @@ public class ArrayConcatSqlAggregator implements SqlAggregator
}
maxSizeBytes = ((Number) RexLiteral.value(maxBytes)).intValue();
}
- final DruidExpression arg = Expressions.toDruidExpression(plannerContext,
rowSignature, arguments.get(0));
+ final DruidExpression arg = Expressions.toDruidExpression(plannerContext,
inputAccessor.getInputRowSignature(), arguments.get(0));
final ExprMacroTable macroTable =
plannerContext.getPlannerToolbox().exprMacroTable();
final String fieldName;
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ArraySqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ArraySqlAggregator.java
index 5136ed3c94..9af5210905 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ArraySqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/ArraySqlAggregator.java
@@ -21,9 +21,7 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -41,18 +39,17 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
import java.util.List;
-import java.util.stream.Collectors;
public class ArraySqlAggregator implements SqlAggregator
{
@@ -69,21 +66,16 @@ public class ArraySqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
- final List<RexNode> arguments = aggregateCall
- .getArgList()
- .stream()
- .map(i -> Expressions.fromFieldAccess(rexBuilder.getTypeFactory(),
rowSignature, project, i))
- .collect(Collectors.toList());
+ final List<RexNode> arguments =
+ inputAccessor.getFields(aggregateCall.getArgList());
Integer maxSizeBytes = null;
if (arguments.size() > 1) {
@@ -94,7 +86,7 @@ public class ArraySqlAggregator implements SqlAggregator
}
maxSizeBytes = ((Number) RexLiteral.value(maxBytes)).intValue();
}
- final DruidExpression arg = Expressions.toDruidExpression(plannerContext,
rowSignature, arguments.get(0));
+ final DruidExpression arg = Expressions.toDruidExpression(plannerContext,
inputAccessor.getInputRowSignature(), arguments.get(0));
if (arg == null) {
// can't translate argument
return null;
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
index a938bdca0b..3814f8d9ad 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java
@@ -22,8 +22,6 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -33,14 +31,13 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -58,23 +55,19 @@ public class AvgSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final List<DruidExpression> arguments =
Aggregations.getArgumentsForSimpleAggregator(
- rexBuilder,
plannerContext,
- rowSignature,
aggregateCall,
- project
+ inputAccessor
);
if (arguments == null) {
@@ -85,11 +78,11 @@ public class AvgSqlAggregator implements SqlAggregator
final AggregatorFactory count =
CountSqlAggregator.createCountAggregatorFactory(
countName,
plannerContext,
- rowSignature,
+ inputAccessor.getInputRowSignature(),
virtualColumnRegistry,
- rexBuilder,
+ inputAccessor.getRexBuilder(),
aggregateCall,
- project
+ inputAccessor
);
final DruidExpression arg = Iterables.getOnlyElement(arguments);
@@ -108,12 +101,8 @@ public class AvgSqlAggregator implements SqlAggregator
if (arg.isDirectColumnAccess()) {
fieldName = arg.getDirectColumn();
} else {
- final RexNode resolutionArg = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- Iterables.getOnlyElement(aggregateCall.getArgList())
- );
+ final RexNode resolutionArg = inputAccessor.getField(
+ Iterables.getOnlyElement(aggregateCall.getArgList()));
fieldName =
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(arg,
resolutionArg.getType());
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BitwiseSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BitwiseSqlAggregator.java
index d8758141df..a5c7fb61cf 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BitwiseSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BitwiseSqlAggregator.java
@@ -21,8 +21,6 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
@@ -41,12 +39,12 @@ import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.NullFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -122,12 +120,10 @@ public class BitwiseSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
@@ -135,8 +131,8 @@ public class BitwiseSqlAggregator implements SqlAggregator
final List<DruidExpression> arguments = aggregateCall
.getArgList()
.stream()
- .map(i -> Expressions.fromFieldAccess(rexBuilder.getTypeFactory(),
rowSignature, project, i))
- .map(rexNode -> Expressions.toDruidExpression(plannerContext,
rowSignature, rexNode))
+ .map(i -> inputAccessor.getField(i))
+ .map(rexNode -> Expressions.toDruidExpression(plannerContext,
inputAccessor.getInputRowSignature(), rexNode))
.collect(Collectors.toList());
if (arguments.stream().anyMatch(Objects::isNull)) {
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BuiltinApproxCountDistinctSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BuiltinApproxCountDistinctSqlAggregator.java
index e4dedd95ce..699c7a8d1c 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BuiltinApproxCountDistinctSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BuiltinApproxCountDistinctSqlAggregator.java
@@ -22,9 +22,7 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
@@ -42,7 +40,6 @@ import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
@@ -50,6 +47,7 @@ import
org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -72,26 +70,20 @@ public class BuiltinApproxCountDistinctSqlAggregator
implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't
let us use direct column access
// for string columns.
- final RexNode rexNode = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- Iterables.getOnlyElement(aggregateCall.getArgList())
- );
+ final RexNode rexNode = inputAccessor.getField(
+ Iterables.getOnlyElement(aggregateCall.getArgList()));
- final DruidExpression arg = Expressions.toDruidExpression(plannerContext,
rowSignature, rexNode);
+ final DruidExpression arg = Expressions.toDruidExpression(plannerContext,
inputAccessor.getInputRowSignature(), rexNode);
if (arg == null) {
return null;
}
@@ -100,7 +92,10 @@ public class BuiltinApproxCountDistinctSqlAggregator
implements SqlAggregator
final String aggregatorName = finalizeAggregations ?
Calcites.makePrefixedName(name, "a") : name;
if (arg.isDirectColumnAccess()
- && rowSignature.getColumnType(arg.getDirectColumn()).map(type ->
type.is(ValueType.COMPLEX)).orElse(false)) {
+ && inputAccessor.getInputRowSignature()
+ .getColumnType(arg.getDirectColumn())
+ .map(type -> type.is(ValueType.COMPLEX))
+ .orElse(false)) {
aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName,
arg.getDirectColumn(), false, true);
} else {
final RelDataType dataType = rexNode.getType();
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
index edc7e3ce50..c28ac8eebb 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java
@@ -22,7 +22,6 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -40,6 +39,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -69,15 +69,10 @@ public class CountSqlAggregator implements SqlAggregator
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final AggregateCall aggregateCall,
- final Project project
+ final InputAccessor inputAccessor
)
{
- final RexNode rexNode = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- Iterables.getOnlyElement(aggregateCall.getArgList())
- );
+ final RexNode rexNode =
inputAccessor.getField(Iterables.getOnlyElement(aggregateCall.getArgList()));
if (rexNode.getType().isNullable()) {
final DimFilter nonNullFilter = Expressions.toFilter(
@@ -102,28 +97,25 @@ public class CountSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final List<DruidExpression> args =
Aggregations.getArgumentsForSimpleAggregator(
- rexBuilder,
plannerContext,
- rowSignature,
aggregateCall,
- project
+ inputAccessor
);
if (args == null) {
return null;
}
+ // FIXME: is-all-literal
if (args.isEmpty()) {
// COUNT(*)
return Aggregation.create(new CountAggregatorFactory(name));
@@ -132,12 +124,10 @@ public class CountSqlAggregator implements SqlAggregator
if (plannerContext.getPlannerConfig().isUseApproximateCountDistinct()) {
return approxCountDistinctAggregator.toDruidAggregation(
plannerContext,
- rowSignature,
virtualColumnRegistry,
- rexBuilder,
name,
aggregateCall,
- project,
+ inputAccessor,
existingAggregations,
finalizeAggregations
);
@@ -150,11 +140,11 @@ public class CountSqlAggregator implements SqlAggregator
AggregatorFactory theCount = createCountAggregatorFactory(
name,
plannerContext,
- rowSignature,
+ inputAccessor.getInputRowSignature(),
virtualColumnRegistry,
- rexBuilder,
+ inputAccessor.getRexBuilder(),
aggregateCall,
- project
+ inputAccessor
);
return Aggregation.create(theCount);
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index 5f1b3c3228..abaeede994 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -20,9 +20,7 @@
package org.apache.druid.sql.calcite.aggregation.builtin;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -53,19 +51,18 @@ import
org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
import
org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
public class EarliestLatestAnySqlAggregator implements SqlAggregator
{
@@ -180,23 +177,17 @@ public class EarliestLatestAnySqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
- final List<RexNode> rexNodes = aggregateCall
- .getArgList()
- .stream()
- .map(i -> Expressions.fromFieldAccess(rexBuilder.getTypeFactory(),
rowSignature, project, i))
- .collect(Collectors.toList());
+ final List<RexNode> rexNodes =
inputAccessor.getFields(aggregateCall.getArgList());
- final List<DruidExpression> args =
Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes);
+ final List<DruidExpression> args =
Expressions.toDruidExpressions(plannerContext,
inputAccessor.getInputRowSignature(), rexNodes);
if (args == null) {
return null;
@@ -216,7 +207,8 @@ public class EarliestLatestAnySqlAggregator implements
SqlAggregator
final String fieldName = getColumnName(plannerContext,
virtualColumnRegistry, args.get(0), rexNodes.get(0));
- if (!rowSignature.contains(ColumnHolder.TIME_COLUMN_NAME) &&
(aggregatorType == AggregatorType.LATEST || aggregatorType ==
AggregatorType.EARLIEST)) {
+ if
(!inputAccessor.getInputRowSignature().contains(ColumnHolder.TIME_COLUMN_NAME)
+ && (aggregatorType == AggregatorType.LATEST || aggregatorType ==
AggregatorType.EARLIEST)) {
// This code is being run as part of the exploratory volcano planner,
currently, the definition of these
// aggregators does not tell Calcite that they depend on a __time column
being in existence, instead we are
// allowing the volcano planner to explore paths that put projections
which eliminate the time column in between
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
index 95b70e1f1e..c12be459cf 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
@@ -20,8 +20,6 @@
package org.apache.druid.sql.calcite.aggregation.builtin;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -38,19 +36,18 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import
org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
public class EarliestLatestBySqlAggregator implements SqlAggregator
{
@@ -76,23 +73,17 @@ public class EarliestLatestBySqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
- final List<RexNode> rexNodes = aggregateCall
- .getArgList()
- .stream()
- .map(i -> Expressions.fromFieldAccess(rexBuilder.getTypeFactory(),
rowSignature, project, i))
- .collect(Collectors.toList());
+ final List<RexNode> rexNodes =
inputAccessor.getFields(aggregateCall.getArgList());
- final List<DruidExpression> args =
Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes);
+ final List<DruidExpression> args =
Expressions.toDruidExpressions(plannerContext,
inputAccessor.getInputRowSignature(), rexNodes);
if (args == null) {
return null;
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/GroupingSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/GroupingSqlAggregator.java
index 156c3995c6..ec829df11d 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/GroupingSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/GroupingSqlAggregator.java
@@ -22,7 +22,6 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -34,6 +33,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -53,24 +53,22 @@ public class GroupingSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
- List<Aggregation> existingAggregations,
- boolean finalizeAggregations
+ final InputAccessor inputAccessor,
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
)
{
List<String> arguments = aggregateCall.getArgList()
.stream()
.map(i -> getColumnName(
plannerContext,
- rowSignature,
- project,
+
inputAccessor.getInputRowSignature(),
+ inputAccessor.getProject(),
virtualColumnRegistry,
- rexBuilder.getTypeFactory(),
+
inputAccessor.getRexBuilder().getTypeFactory(),
i
))
.filter(Objects::nonNull)
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/LiteralSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/LiteralSqlAggregator.java
index 0eb2c1085c..6e7de762b2 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/LiteralSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/LiteralSqlAggregator.java
@@ -21,18 +21,16 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.fun.SqlInternalOperators;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -59,12 +57,10 @@ public class LiteralSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
@@ -73,7 +69,7 @@ public class LiteralSqlAggregator implements SqlAggregator
return null;
}
final RexNode literal = aggregateCall.rexList.get(0);
- final DruidExpression expr = Expressions.toDruidExpression(plannerContext,
rowSignature, literal);
+ final DruidExpression expr = Expressions.toDruidExpression(plannerContext,
inputAccessor.getInputRowSignature(), literal);
if (expr == null) {
return null;
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SimpleSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SimpleSqlAggregator.java
index 0178266866..5da064c285 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SimpleSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/SimpleSqlAggregator.java
@@ -21,18 +21,16 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.Iterables;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -57,12 +55,10 @@ public abstract class SimpleSqlAggregator implements
SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
- final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
- final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
@@ -72,11 +68,9 @@ public abstract class SimpleSqlAggregator implements
SqlAggregator
}
final List<DruidExpression> arguments =
Aggregations.getArgumentsForSimpleAggregator(
- rexBuilder,
plannerContext,
- rowSignature,
aggregateCall,
- project
+ inputAccessor
);
if (arguments == null) {
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/StringSqlAggregator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/StringSqlAggregator.java
index b391100ff3..7c1389de3f 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/StringSqlAggregator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/StringSqlAggregator.java
@@ -21,9 +21,7 @@ package org.apache.druid.sql.calcite.aggregation.builtin;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
@@ -47,13 +45,13 @@ import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.NullFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignatures;
@@ -89,12 +87,10 @@ public class StringSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
@@ -102,20 +98,15 @@ public class StringSqlAggregator implements SqlAggregator
final List<DruidExpression> arguments = aggregateCall
.getArgList()
.stream()
- .map(i -> Expressions.fromFieldAccess(rexBuilder.getTypeFactory(),
rowSignature, project, i))
- .map(rexNode -> Expressions.toDruidExpression(plannerContext,
rowSignature, rexNode))
+ .map(i -> inputAccessor.getField(i))
+ .map(rexNode -> Expressions.toDruidExpression(plannerContext,
inputAccessor.getInputRowSignature(), rexNode))
.collect(Collectors.toList());
if (arguments.stream().anyMatch(Objects::isNull)) {
return null;
}
- RexNode separatorNode = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(1)
- );
+ RexNode separatorNode =
inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!separatorNode.isA(SqlKind.LITERAL)) {
// separator must be a literal
return null;
@@ -133,12 +124,7 @@ public class StringSqlAggregator implements SqlAggregator
Integer maxSizeBytes = null;
if (arguments.size() > 2) {
- RexNode maxBytes = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- aggregateCall.getArgList().get(2)
- );
+ RexNode maxBytes =
inputAccessor.getField(aggregateCall.getArgList().get(2));
if (!maxBytes.isA(SqlKind.LITERAL)) {
// maxBytes must be a literal
return null;
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/WindowSqlAggregate.java
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/WindowSqlAggregate.java
index 7dd158d91f..00cd391eab 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/WindowSqlAggregate.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/WindowSqlAggregate.java
@@ -20,14 +20,12 @@
package org.apache.druid.sql.calcite.expression;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -55,12 +53,10 @@ public class WindowSqlAggregate implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
- RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
- RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
- Project project,
+ InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 9c41d79070..1cf79b6dc1 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -580,7 +580,11 @@ public class DruidQuery
rowSignature,
virtualColumnRegistry,
rexBuilder,
- partialQuery.getSelectProject(),
+ InputAccessor.buildFor(
+ rexBuilder,
+ rowSignature,
+ partialQuery.getSelectProject(),
+ null),
aggregations,
aggName,
aggCall,
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/InputAccessor.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/InputAccessor.java
new file mode 100644
index 0000000000..57b81c6853
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/InputAccessor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.expression.Expressions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Enables simpler access to input expressions.
+ *
+ * In case of aggregates it provides the constants transparently for
aggregates.
+ */
+public class InputAccessor
+{
+ private final Project project;
+ private final ImmutableList<RexLiteral> constants;
+ private final RexBuilder rexBuilder;
+ private final RowSignature inputRowSignature;
+ private final int inputFieldCount;
+
+ public static InputAccessor buildFor(
+ RexBuilder rexBuilder,
+ RowSignature inputRowSignature,
+ @Nullable Project project,
+ @Nullable ImmutableList<RexLiteral> constants)
+ {
+ return new InputAccessor(rexBuilder, inputRowSignature, project,
constants);
+ }
+
+ private InputAccessor(
+ RexBuilder rexBuilder,
+ RowSignature inputRowSignature,
+ Project project,
+ ImmutableList<RexLiteral> constants)
+ {
+ this.rexBuilder = rexBuilder;
+ this.inputRowSignature = inputRowSignature;
+ this.project = project;
+ this.constants = constants;
+ this.inputFieldCount = project != null ?
project.getRowType().getFieldCount() : inputRowSignature.size();
+ }
+
+ public RexNode getField(int argIndex)
+ {
+
+ if (argIndex < inputFieldCount) {
+ return Expressions.fromFieldAccess(
+ rexBuilder.getTypeFactory(),
+ inputRowSignature,
+ project,
+ argIndex);
+ } else {
+ return constants.get(argIndex - inputFieldCount);
+ }
+ }
+
+ public List<RexNode> getFields(List<Integer> argList)
+ {
+ return argList
+ .stream()
+ .map(i -> getField(i))
+ .collect(Collectors.toList());
+ }
+
+ public @Nullable Project getProject()
+ {
+ return project;
+ }
+
+
+ public RexBuilder getRexBuilder()
+ {
+ return rexBuilder;
+ }
+
+
+ public RowSignature getInputRowSignature()
+ {
+ return inputRowSignature;
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
index 07c5544441..4039ca8914 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
@@ -177,7 +177,11 @@ public class Windowing
sourceRowSignature,
null,
rexBuilder,
- partialQuery.getSelectProject(),
+ InputAccessor.buildFor(
+ rexBuilder,
+ sourceRowSignature,
+ partialQuery.getSelectProject(),
+ window.constants),
Collections.emptyList(),
aggName,
aggregateCall,
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
index 50bdf80771..fecabd00ec 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/GroupByRules.java
@@ -20,7 +20,6 @@
package org.apache.druid.sql.calcite.rule;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -32,6 +31,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
@@ -58,7 +58,7 @@ public class GroupByRules
final RowSignature rowSignature,
@Nullable final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
- final Project project,
+ final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final String name,
final AggregateCall call,
@@ -74,11 +74,7 @@ public class GroupByRules
if (call.filterArg >= 0) {
// AGG(xxx) FILTER(WHERE yyy)
- final RexNode expression = Expressions.fromFieldAccess(
- rexBuilder.getTypeFactory(),
- rowSignature,
- project,
- call.filterArg);
+ final RexNode expression = inputAccessor.getField(call.filterArg);
final DimFilter nonOptimizedFilter = Expressions.toFilter(
plannerContext,
@@ -136,12 +132,10 @@ public class GroupByRules
final Aggregation retVal = sqlAggregator.toDruidAggregation(
plannerContext,
- rowSignature,
virtualColumnRegistry,
- rexBuilder,
name,
call,
- project,
+ inputAccessor,
existingAggregationsWithSameFilter,
finalizeAggregations
);
diff --git
a/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest
b/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest
new file mode 100644
index 0000000000..16dbe924fd
--- /dev/null
+++ b/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest
@@ -0,0 +1,26 @@
+type: "operatorValidation"
+
+sql: |
+ SELECT
+ dim1,
+ count(333) OVER () cc
+ FROM foo
+ WHERE length(dim1)>0
+
+expectedOperators:
+ - type: naivePartition
+ partitionColumns: []
+ - type: "window"
+ processor:
+ type: "framedAgg"
+ frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0,
uppUnbounded: true, uppOffset: 0 }
+ aggregations:
+ - { type: "count", name: "w0" }
+
+expectedResults:
+ - ["10.1",5]
+ - ["2",5]
+ - ["1",5]
+ - ["def",5]
+ - ["abc",5]
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]