This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ad32a2a5ec adding in AggregateNode and related contents. (#8946)
ad32a2a5ec is described below
commit ad32a2a5ec113165217066ff9dd2a8b5d297ead1
Author: Rong Rong <[email protected]>
AuthorDate: Thu Jun 30 12:44:35 2022 -0700
adding in AggregateNode and related contents. (#8946)
- adding agg split rules
- adding agg split rules for leaf-intermediate split
- adding agg operator conversion from input groups to intermediate
group
- support agg after JOIN as well
- adding in agg without group by as well
- fix add vs. replace selectList
- also support multi-column group by key
- misc
- support multi-column JOIN
- also fix hash distribution rule
- validate that transform actually works with group-by
Co-authored-by: Rong Rong <[email protected]>
---
.../pinot/common/utils/request/RequestUtils.java | 2 +-
.../tests/MultiStageEngineIntegrationTest.java | 7 +-
.../query/parser/CalciteRexExpressionParser.java | 34 +++-
.../query/planner/hints/PinotRelationalHints.java | 2 +
.../query/planner/logical/RelToStageConverter.java | 8 +
.../pinot/query/planner/logical/RexExpression.java | 17 +-
.../pinot/query/planner/logical/StagePlanner.java | 3 +-
.../partitioning/FieldSelectionKeySelector.java | 2 +-
.../pinot/query/planner/stage/AggregateNode.java | 58 ++++++
.../query/planner/stage/StageNodeSerDeUtils.java | 2 +
.../PinotAggregateExchangeNodeInsertRule.java | 181 ++++++++++++++++++
...e.java => PinotJoinExchangeNodeInsertRule.java} | 10 +-
.../pinot/query/rules/PinotQueryRuleSets.java | 3 +-
.../pinot/query/QueryEnvironmentTestBase.java | 10 +-
.../runtime/executor/WorkerQueryExecutor.java | 7 +
.../query/runtime/operator/AggregateOperator.java | 211 +++++++++++++++++++++
.../query/runtime/utils/ServerRequestUtils.java | 10 +-
.../apache/pinot/query/QueryServerEnclosure.java | 2 +-
.../pinot/query/runtime/QueryRunnerTest.java | 22 ++-
19 files changed, 565 insertions(+), 26 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index 32afe620fc..4f0f7bcca8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -114,7 +114,7 @@ public class RequestUtils {
}
public static Expression getFunctionExpression(String canonicalName) {
- assert
canonicalName.equals(canonicalizeFunctionNamePreservingSpecialKey(canonicalName));
+ assert
canonicalName.equalsIgnoreCase(canonicalizeFunctionNamePreservingSpecialKey(canonicalName));
Expression expression = new Expression(ExpressionType.FUNCTION);
Function function = new Function(canonicalName);
expression.setFunctionCall(function);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 5d149de4ed..0d78d2b182 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -113,10 +113,11 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTest
@DataProvider
public Object[][] multiStageQueryEngineSqlTestSet() {
return new Object[][] {
- new Object[]{"SELECT * FROM mytable_OFFLINE", 10, 73},
+ new Object[]{"SELECT COUNT(*) FROM mytable_OFFLINE WHERE
Carrier='AA'", 1, 1},
+ new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>1000", 2,
73},
new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE WHERE
CarrierDelay=15 AND ArrDelay>20", 10, 2},
- new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE
AS b ON a.AirlineID = b.AirlineID "
- + " WHERE a.CarrierDelay=15 AND a.ArrDelay>20 AND b.ArrDelay<20",
10, 146}
+ new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE
AS b ON a.Origin = b.Origin "
+ + " WHERE a.Carrier='AA' AND a.ArrDelay>1000 AND b.ArrDelay>1000",
2, 146}
};
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index a4f5753557..70ee68b21c 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -53,8 +53,13 @@ public class CalciteRexExpressionParser {
// Relational conversion Utils
// --------------------------------------------------------------------------
- public static List<Expression> convertSelectList(List<RexExpression>
rexNodeList, PinotQuery pinotQuery) {
- List<Expression> selectExpr = new ArrayList<>();
+ public static List<Expression> overwriteSelectList(List<RexExpression>
rexNodeList, PinotQuery pinotQuery) {
+ return addSelectList(new ArrayList<>(), rexNodeList, pinotQuery);
+ }
+
+ public static List<Expression> addSelectList(List<Expression> existingList,
List<RexExpression> rexNodeList,
+ PinotQuery pinotQuery) {
+ List<Expression> selectExpr = new ArrayList<>(existingList);
final Iterator<RexExpression> iterator = rexNodeList.iterator();
while (iterator.hasNext()) {
@@ -65,6 +70,18 @@ public class CalciteRexExpressionParser {
return selectExpr;
}
+ public static List<Expression> convertGroupByList(List<RexExpression>
rexNodeList, PinotQuery pinotQuery) {
+ List<Expression> groupByExpr = new ArrayList<>();
+
+ final Iterator<RexExpression> iterator = rexNodeList.iterator();
+ while (iterator.hasNext()) {
+ final RexExpression next = iterator.next();
+ groupByExpr.add(toExpression(next, pinotQuery));
+ }
+
+ return groupByExpr;
+ }
+
private static List<Expression>
convertDistinctSelectList(RexExpression.FunctionCall rexCall, PinotQuery
pinotQuery) {
List<Expression> selectExpr = new ArrayList<>();
selectExpr.add(convertDistinctAndSelectListToFunctionExpression(rexCall,
pinotQuery));
@@ -169,7 +186,7 @@ public class CalciteRexExpressionParser {
operands.add(toExpression(childNode, pinotQuery));
}
ParserUtils.validateFunction(functionName, operands);
- Expression functionExpression =
RequestUtils.getFunctionExpression(functionName);
+ Expression functionExpression =
RequestUtils.getFunctionExpression(canonicalizeFunctionName(functionName));
functionExpression.getFunctionCall().setOperands(operands);
return functionExpression;
}
@@ -209,4 +226,15 @@ public class CalciteRexExpressionParser {
andExpression.getFunctionCall().setOperands(operands);
return andExpression;
}
+
+ /**
+ * Canonicalize Calcite generated Logical function names.
+ */
+ private static String canonicalizeFunctionName(String functionName) {
+ if (functionName.endsWith("0")) {
+ return functionName.substring(0, functionName.length() - 1);
+ } else {
+ return functionName;
+ }
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java
index 19a9daa54f..2c4cb976a6 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/hints/PinotRelationalHints.java
@@ -27,6 +27,8 @@ import org.apache.calcite.rel.hint.RelHint;
public class PinotRelationalHints {
public static final RelHint USE_HASH_DISTRIBUTE =
RelHint.builder("USE_HASH_DISTRIBUTE").build();
public static final RelHint USE_BROADCAST_DISTRIBUTE =
RelHint.builder("USE_BROADCAST_DISTRIBUTE").build();
+ public static final RelHint AGG_INTERMEDIATE_STAGE =
RelHint.builder("AGG_INTERMEDIATE_STAGE").build();
+ public static final RelHint AGG_LEAF_STAGE =
RelHint.builder("AGG_LEAF_STAGE").build();
private PinotRelationalHints() {
// do not instantiate.
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index bc6d7dc4ca..23f8fb6db8 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
@@ -32,6 +33,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
@@ -65,11 +67,17 @@ public final class RelToStageConverter {
return convertLogicalProject((LogicalProject) node, currentStageId);
} else if (node instanceof LogicalFilter) {
return convertLogicalFilter((LogicalFilter) node, currentStageId);
+ } else if (node instanceof LogicalAggregate) {
+ return convertLogicalAggregate((LogicalAggregate) node, currentStageId);
} else {
throw new UnsupportedOperationException("Unsupported logical plan node:
" + node);
}
}
+ private static StageNode convertLogicalAggregate(LogicalAggregate node, int
currentStageId) {
+ return new AggregateNode(currentStageId, node.getAggCallList(),
node.getGroupSet());
+ }
+
private static StageNode convertLogicalProject(LogicalProject node, int
currentStageId) {
return new ProjectNode(currentStageId, node.getProjects());
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index 17e472c811..778907e40a 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.logical;
import java.math.BigDecimal;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
@@ -62,18 +63,24 @@ public interface RexExpression {
}
}
+ static RexExpression toRexExpression(AggregateCall aggCall) {
+ List<RexExpression> operands =
aggCall.getArgList().stream().map(InputRef::new).collect(Collectors.toList());
+ return new RexExpression.FunctionCall(aggCall.getAggregation().getKind(),
toDataType(aggCall.getType()),
+ aggCall.getAggregation().getName(), operands);
+ }
+
static Object toRexValue(FieldSpec.DataType dataType, Comparable value) {
switch (dataType) {
case INT:
- return ((BigDecimal) value).intValue();
+ return value == null ? 0 : ((BigDecimal) value).intValue();
case LONG:
- return ((BigDecimal) value).longValue();
+ return value == null ? 0L : ((BigDecimal) value).longValue();
case FLOAT:
- return ((BigDecimal) value).floatValue();
+ return value == null ? 0f : ((BigDecimal) value).floatValue();
case DOUBLE:
- return ((BigDecimal) value).doubleValue();
+ return value == null ? 0d : ((BigDecimal) value).doubleValue();
case STRING:
- return ((NlsString) value).getValue();
+ return value == null ? "" : ((NlsString) value).getValue();
default:
return value;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index db7fd63d83..8d9bbbc51d 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -99,13 +99,14 @@ public class StagePlanner {
// 1. exchangeNode always have only one input, get its input converted
as a new stage root.
StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
RelDistribution distribution = ((LogicalExchange)
node).getDistribution();
+ List<Integer> distributionKeys = distribution.getKeys();
RelDistribution.Type exchangeType = distribution.getType();
// 2. make an exchange sender and receiver node pair
StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getStageId(), exchangeType);
StageNode mailboxSender = new
MailboxSendNode(nextStageRoot.getStageId(), mailboxReceiver.getStageId(),
exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
- ? new FieldSelectionKeySelector(distribution.getKeys().get(0)) :
null);
+ ? new FieldSelectionKeySelector(distributionKeys) : null);
mailboxSender.addInput(nextStageRoot);
// 3. put the sender side as a completed stage.
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
index 674cc8e2a2..fd04ca589e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
@@ -71,6 +71,6 @@ public class FieldSelectionKeySelector implements
KeySelector<Object[], Object[]
for (int columnIndex : _columnIndices) {
hashCodeBuilder.append(input[columnIndex]);
}
- return hashCodeBuilder.toHashCode();
+ return Math.abs(hashCodeBuilder.toHashCode());
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
new file mode 100644
index 0000000000..ae41d14a79
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+public class AggregateNode extends AbstractStageNode {
+ @ProtoProperties
+ private List<RexExpression> _aggCalls;
+ @ProtoProperties
+ private List<RexExpression> _groupSet;
+
+ public AggregateNode(int stageId) {
+ super(stageId);
+ }
+
+ public AggregateNode(int stageId, List<AggregateCall> aggCalls,
ImmutableBitSet groupSet) {
+ super(stageId);
+ _aggCalls =
aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
+ _groupSet = new ArrayList<>(groupSet.cardinality());
+ Iterator<Integer> groupSetIt = groupSet.iterator();
+ while (groupSetIt.hasNext()) {
+ _groupSet.add(new RexExpression.InputRef(groupSetIt.next()));
+ }
+ }
+
+ public List<RexExpression> getAggCalls() {
+ return _aggCalls;
+ }
+
+ public List<RexExpression> getGroupSet() {
+ return _groupSet;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index 3d34f6effb..8d341a207c 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -56,6 +56,8 @@ public final class StageNodeSerDeUtils {
return new ProjectNode(stageId);
case "FilterNode":
return new FilterNode(stageId);
+ case "AggregateNode":
+ return new AggregateNode(stageId);
case "MailboxSendNode":
return new MailboxSendNode(stageId);
case "MailboxReceiveNode":
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
new file mode 100644
index 0000000000..1150bc2085
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.rules;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.pinot.query.planner.hints.PinotRelationalHints;
+
+
+/**
+ * Special rule for Pinot, this rule is fixed to generate a 2-stage
aggregation split between the
+ * (1) non-data-locale Pinot server agg stage, and (2) the data-locale Pinot
intermediate agg stage.
+ *
+ * Pinot uses special intermediate data representation for partially
aggregated results, thus we can't use
+ * {@link org.apache.calcite.rel.rules.AggregateReduceFunctionsRule} to reduce
complex aggregation.
+ *
+ * This rule is here to introduces Pinot-special aggregation splits.
In-general, all aggregations are split into
+ * intermediate-stage AGG; and server-stage AGG with the same naming. E.g.
+ *
+ * COUNT(*) transforms into: COUNT(*)_SERVER --> COUNT(*)_INTERMEDIATE, where
+ * COUNT(*)_SERVER produces TUPLE[ SUM(1), GROUP_BY_KEY ]
+ * COUNT(*)_INTERMEDIATE produces TUPLE[ SUM(COUNT(*)_SERVER), GROUP_BY_KEY ]
+ *
+ * However, the suffix _SERVER/_INTERMEDIATE is merely a SQL hint to the
Aggregate operator and will be translated
+ * into correct, actual operator chain during Physical plan.
+ */
+public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
+ public static final PinotAggregateExchangeNodeInsertRule INSTANCE =
+ new PinotAggregateExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
+
+ public PinotAggregateExchangeNodeInsertRule(RelBuilderFactory factory) {
+ super(operand(LogicalAggregate.class, any()), factory, null);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ if (call.rels.length < 1) {
+ return false;
+ }
+ if (call.rel(0) instanceof Aggregate) {
+ Aggregate agg = call.rel(0);
+ return !agg.getHints().contains(PinotRelationalHints.AGG_LEAF_STAGE)
+ &&
!agg.getHints().contains(PinotRelationalHints.AGG_INTERMEDIATE_STAGE);
+ }
+ return false;
+ }
+
+ /**
+ * Split the AGG into 2 stages, both with the same AGG type,
+ * Pinot internal stage optimization can use the info of the input data type
to infer whether it should generate
+ * the "intermediate-stage AGG operator" or a "leaf-stage AGG operator"
+ * @see org.apache.pinot.core.query.aggregation.function.AggregationFunction
+ *
+ * @param call the {@link RelOptRuleCall} on match.
+ */
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Aggregate oldAggRel = call.rel(0);
+ ImmutableList<RelHint> orgHints = oldAggRel.getHints();
+
+ // 1. attach leaf agg RelHint to original agg.
+ ImmutableList<RelHint> newLeafAggHints =
+ new
ImmutableList.Builder<RelHint>().addAll(orgHints).add(PinotRelationalHints.AGG_LEAF_STAGE).build();
+ Aggregate newLeafAgg =
+ new LogicalAggregate(oldAggRel.getCluster(), oldAggRel.getTraitSet(),
newLeafAggHints, oldAggRel.getInput(),
+ oldAggRel.getGroupSet(), oldAggRel.getGroupSets(),
oldAggRel.getAggCallList());
+
+ // 2. attach exchange.
+ List<Integer> groupSetIndices = ImmutableIntList.range(0,
oldAggRel.getGroupCount());
+ LogicalExchange exchange = null;
+ if (groupSetIndices.size() == 0) {
+ exchange = LogicalExchange.create(newLeafAgg,
RelDistributions.SINGLETON);
+ } else {
+ exchange = LogicalExchange.create(newLeafAgg,
RelDistributions.hash(groupSetIndices));
+ }
+
+ // 3. attach intermediate agg stage.
+ RelNode newAggNode = makeNewIntermediateAgg(call, oldAggRel, exchange);
+ call.transformTo(newAggNode);
+ }
+
+ private RelNode makeNewIntermediateAgg(RelOptRuleCall ruleCall, Aggregate
oldAggRel, LogicalExchange exchange) {
+
+ // add the exchange as the input node to the relation builder.
+ RelBuilder relBuilder = ruleCall.builder();
+ relBuilder.push(exchange);
+ List<RexNode> inputExprs = new ArrayList<>(relBuilder.fields());
+
+ // make input ref to the exchange after the leaf aggregate.
+ RexBuilder rexBuilder = exchange.getCluster().getRexBuilder();
+ final int nGroups = oldAggRel.getGroupCount();
+ for (int i = 0; i < nGroups; i++) {
+ rexBuilder.makeInputRef(oldAggRel, i);
+ }
+
+ // create new aggregate function calls from exchange input.
+ List<AggregateCall> oldCalls = oldAggRel.getAggCallList();
+ List<AggregateCall> newCalls = new ArrayList<>();
+ Map<AggregateCall, RexNode> aggCallMapping = new HashMap<>();
+
+ for (int oldCallIndex = 0; oldCallIndex < oldCalls.size(); oldCallIndex++)
{
+ AggregateCall oldCall = oldCalls.get(oldCallIndex);
+ convertAggCall(rexBuilder, oldAggRel, oldCallIndex, oldCall, newCalls,
aggCallMapping, inputExprs);
+ }
+
+ // create new aggregate relation.
+ ImmutableList<RelHint> orgHints = oldAggRel.getHints();
+ ImmutableList<RelHint> newIntermediateAggHints =
+ new
ImmutableList.Builder<RelHint>().addAll(orgHints).add(PinotRelationalHints.AGG_INTERMEDIATE_STAGE).build();
+ ImmutableBitSet groupSet = ImmutableBitSet.range(nGroups);
+ relBuilder.aggregate(
+ relBuilder.groupKey(groupSet, ImmutableList.of(groupSet)),
+ newCalls);
+ relBuilder.hints(newIntermediateAggHints);
+ return relBuilder.build();
+ }
+
+ /**
+ * convert aggregate call based on the intermediate stage input.
+ *
+ * <p>Note that the intermediate stage input only supports splittable
aggregators such as SUM/MIN/MAX.
+ * All non-splittable aggregator must be converted into splittable
aggregator first.
+ */
+ private static void convertAggCall(RexBuilder rexBuilder, Aggregate
oldAggRel, int oldCallIndex,
+ AggregateCall oldCall, List<AggregateCall> newCalls, Map<AggregateCall,
RexNode> aggCallMapping,
+ List<RexNode> inputExprs) {
+ final int nGroups = oldAggRel.getGroupCount();
+ AggregateCall newCall = AggregateCall.create(
+ oldCall.getAggregation(), oldCall.isDistinct(),
oldCall.isApproximate(), oldCall.ignoreNulls(),
+ convertArgList(nGroups + oldCallIndex, oldCall.getArgList()),
oldCall.filterArg, oldCall.distinctKeys,
+ oldCall.collation, oldCall.type, oldCall.getName());
+ rexBuilder.addAggCall(newCall,
+ nGroups,
+ newCalls,
+ aggCallMapping,
+ oldAggRel.getInput()::fieldIsNullable);
+ }
+
+ private static List<Integer> convertArgList(int oldCallIndexWithShift,
List<Integer> argList) {
+ Preconditions.checkArgument(argList.size() <= 1,
+ "Unable to convert call as the argList contains more than 1 argument");
+ return argList.size() == 1 ?
Collections.singletonList(oldCallIndexWithShift) : Collections.emptyList();
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
similarity index 91%
rename from
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
rename to
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
index 2a1b740669..6aaacccac1 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
@@ -38,13 +38,13 @@ import
org.apache.pinot.query.planner.hints.PinotRelationalHints;
/**
- * Special rule for Pinot, always insert exchange after JOIN
+ * Special rule for Pinot, this rule is fixed to always insert exchange after
JOIN node.
*/
-public class PinotExchangeNodeInsertRule extends RelOptRule {
- public static final PinotExchangeNodeInsertRule INSTANCE =
- new PinotExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
+public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
+ public static final PinotJoinExchangeNodeInsertRule INSTANCE =
+ new PinotJoinExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
- public PinotExchangeNodeInsertRule(RelBuilderFactory factory) {
+ public PinotJoinExchangeNodeInsertRule(RelBuilderFactory factory) {
super(operand(LogicalJoin.class, any()), factory, null);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
index 63c2fd799f..1f3759b7b4 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
@@ -89,5 +89,6 @@ public class PinotQueryRuleSets {
PruneEmptyRules.UNION_INSTANCE,
// Pinot specific rules
- PinotExchangeNodeInsertRule.INSTANCE);
+ PinotJoinExchangeNodeInsertRule.INSTANCE,
+ PinotAggregateExchangeNodeInsertRule.INSTANCE);
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 176bf3edc8..1c6ad7ff29 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -47,7 +47,15 @@ public class QueryEnvironmentTestBase {
new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3
>= 0"},
new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 =
b.col2"},
new Object[]{"SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 =
b.col2 "
- + "WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"},
+ + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"},
+ new Object[]{"SELECT a.col1, a.col3 + a.ts FROM a WHERE a.col3 >= 0
AND a.col2 = 'a'"},
+ new Object[]{"SELECT SUM(a.col3), COUNT(*) FROM a WHERE a.col3 >= 0
AND a.col2 = 'a'"},
+ new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND
a.col2 = 'a' GROUP BY a.col1"},
+ new Object[]{"SELECT a.col1, AVG(a.col3) FROM a WHERE a.col3 >= 0 AND
a.col2 = 'a' GROUP BY a.col1"},
+ new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3
>= 0 AND a.col1 = 'a' "
+ + " GROUP BY a.col1, a.col2"},
+ new Object[]{"SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 =
b.col2 "
+ + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY
a.col1"},
};
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 01a97643ab..379d7bdbd6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -30,6 +30,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -38,6 +39,7 @@ import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -117,6 +119,11 @@ public class WorkerQueryExecutor {
BaseOperator<TransferableBlock> leftOperator = getOperator(requestId,
joinNode.getInputs().get(0), metadataMap);
BaseOperator<TransferableBlock> rightOperator = getOperator(requestId,
joinNode.getInputs().get(1), metadataMap);
return new HashJoinOperator(leftOperator, rightOperator,
joinNode.getCriteria());
+ } else if (stageNode instanceof AggregateNode) {
+ AggregateNode aggregateNode = (AggregateNode) stageNode;
+ BaseOperator<TransferableBlock> inputOperator =
+ getOperator(requestId, aggregateNode.getInputs().get(0),
metadataMap);
+ return new AggregateOperator(inputOperator, aggregateNode.getAggCalls(),
aggregateNode.getGroupSet());
} else if (stageNode instanceof FilterNode) {
throw new UnsupportedOperationException("Unsupported!");
} else if (stageNode instanceof ProjectNode) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
new file mode 100644
index 0000000000..95a5dcd03f
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import
org.apache.pinot.core.query.aggregation.function.CountAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.MaxAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.MinAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.SumAggregationFunction;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+/**
+ *
+ */
+public class AggregateOperator extends BaseOperator<TransferableBlock> {
+ private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+
+ private BaseOperator<TransferableBlock> _inputOperator;
+ private List<RexExpression> _aggCalls;
+ private List<RexExpression> _groupSet;
+
+ private final AggregationFunction[] _aggregationFunctions;
+ private final Map<Integer, Object>[] _groupByResultHolders;
+ private final Map<Integer, Object[]> _groupByKeyHolder;
+
+ private DataSchema _dataSchema;
+ private boolean _isCumulativeBlockConstructed;
+
+ // TODO: refactor Pinot Reducer code to support the intermediate stage agg
operator.
+ public AggregateOperator(BaseOperator<TransferableBlock> inputOperator,
List<RexExpression> aggCalls,
+ List<RexExpression> groupSet) {
+ _inputOperator = inputOperator;
+ _aggCalls = aggCalls;
+ _groupSet = groupSet;
+
+ _aggregationFunctions = new AggregationFunction[_aggCalls.size()];
+ _groupByResultHolders = new Map[_aggCalls.size()];
+ _groupByKeyHolder = new HashMap<Integer, Object[]>();
+ for (int i = 0; i < aggCalls.size(); i++) {
+ _aggregationFunctions[i] = (toAggregationFunction(aggCalls.get(i)));
+ _groupByResultHolders[i] = new HashMap<Integer, Object>();
+ }
+
+ _isCumulativeBlockConstructed = false;
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ // WorkerExecutor doesn't use getChildOperators, returns null here.
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ try {
+ cumulateAggregationBlocks();
+ return new TransferableBlock(toResultBlock());
+ } catch (Exception e) {
+ return TransferableBlockUtils.getErrorTransferableBlock(e);
+ }
+ }
+
+ private BaseDataBlock toResultBlock()
+ throws IOException {
+ if (!_isCumulativeBlockConstructed) {
+ List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size());
+ for (Map.Entry<Integer, Object[]> e : _groupByKeyHolder.entrySet()) {
+ Object[] row = new Object[_aggregationFunctions.length +
_groupSet.size()];
+ Object[] keyElements = e.getValue();
+ for (int i = 0; i < keyElements.length; i++) {
+ row[i] = keyElements[i];
+ }
+ for (int i = 0; i < _groupByResultHolders.length; i++) {
+ row[i + _groupSet.size()] = _groupByResultHolders[i].get(e.getKey());
+ }
+ rows.add(row);
+ }
+ _isCumulativeBlockConstructed = true;
+ if (rows.size() == 0) {
+ return DataBlockUtils.getEmptyDataBlock(_dataSchema);
+ } else {
+ return DataBlockBuilder.buildFromRows(rows, null, _dataSchema);
+ }
+ } else {
+ return DataBlockUtils.getEndOfStreamDataBlock();
+ }
+ }
+
+ private void cumulateAggregationBlocks() {
+ TransferableBlock block = _inputOperator.nextBlock();
+ while (!TransferableBlockUtils.isEndOfStream(block)) {
+ BaseDataBlock dataBlock = block.getDataBlock();
+ if (_dataSchema == null) {
+ _dataSchema = dataBlock.getDataSchema();
+ }
+ int numRows = dataBlock.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object[] row =
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+ Key key = extraRowKey(row, _groupSet);
+ int keyHashCode = key.hashCode();
+ _groupByKeyHolder.put(keyHashCode, key.getValues());
+ for (int i = 0; i < _aggregationFunctions.length; i++) {
+ Object currentRes = _groupByResultHolders[i].get(keyHashCode);
+ if (currentRes == null) {
+ _groupByResultHolders[i].put(keyHashCode, row[i +
_groupSet.size()]);
+ } else {
+ _groupByResultHolders[i].put(keyHashCode,
+ merge(_aggCalls.get(i), currentRes, row[i +
_groupSet.size()]));
+ }
+ }
+ }
+ block = _inputOperator.nextBlock();
+ }
+ }
+
+ private AggregationFunction toAggregationFunction(RexExpression aggCall) {
+ Preconditions.checkState(aggCall instanceof RexExpression.FunctionCall);
+ switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) {
+ case "$SUM":
+ case "$SUM0":
+ return new SumAggregationFunction(
+ ExpressionContext.forIdentifier(
+ ((RexExpression.FunctionCall)
aggCall).getFunctionOperands().get(0).toString()));
+ case "$COUNT":
+ case "COUNT":
+ return new CountAggregationFunction();
+ case "$MIN":
+ case "$MIN0":
+ return new MinAggregationFunction(
+ ExpressionContext.forIdentifier(
+ ((RexExpression.FunctionCall)
aggCall).getFunctionOperands().get(0).toString()));
+ case "$MAX":
+ case "$MAX0":
+ return new MaxAggregationFunction(
+ ExpressionContext.forIdentifier(
+ ((RexExpression.FunctionCall)
aggCall).getFunctionOperands().get(0).toString()));
+ default:
+ throw new IllegalStateException(
+ "Unexpected value: " + ((RexExpression.FunctionCall)
aggCall).getFunctionName());
+ }
+ }
+
+ private Object merge(RexExpression aggCall, Object left, Object right) {
+ Preconditions.checkState(aggCall instanceof RexExpression.FunctionCall);
+ switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) {
+ case "$SUM":
+ case "$SUM0":
+ return (double) left + (double) right;
+ case "$COUNT":
+ return (int) left + (int) right;
+ case "$MIN":
+ case "$MIN0":
+ return Math.min((double) left, (double) right);
+ case "$MAX":
+ case "$MAX0":
+ return Math.max((double) left, (double) right);
+ default:
+ throw new IllegalStateException(
+ "Unexpected value: " + ((RexExpression.FunctionCall)
aggCall).getFunctionName());
+ }
+ }
+
+ private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
+ Object[] keyElements = new Object[groupSet.size()];
+ for (int i = 0; i < groupSet.size(); i++) {
+ keyElements[i] = row[((RexExpression.InputRef)
groupSet.get(i)).getIndex()];
+ }
+ return new Key(keyElements);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 86ec13adcf..15a8b0194c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.request.QuerySource;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
@@ -104,8 +105,15 @@ public class ServerRequestUtils {
pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(
((FilterNode) node).getCondition(), pinotQuery));
} else if (node instanceof ProjectNode) {
- pinotQuery.setSelectList(CalciteRexExpressionParser.convertSelectList(
+ pinotQuery.setSelectList(CalciteRexExpressionParser.overwriteSelectList(
((ProjectNode) node).getProjects(), pinotQuery));
+ } else if (node instanceof AggregateNode) {
+ // set agg list
+
pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getSelectList(),
+ ((AggregateNode) node).getAggCalls(), pinotQuery));
+ // set group-by list
+ pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(
+ ((AggregateNode) node).getGroupSet(), pinotQuery));
} else if (node instanceof MailboxSendNode) {
// TODO: MailboxSendNode should be the root of the leaf stage. but
ignore for now since it is handle seperately
// in QueryRunner as a single step sender.
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 0e60a4a533..2ef4958432 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -142,7 +142,7 @@ public class QueryServerEnclosure {
for (int i = 0; i < NUM_ROWS; i++) {
GenericRow row = new GenericRow();
row.putValue("col1", STRING_FIELD_LIST[i % STRING_FIELD_LIST.length]);
- row.putValue("col2", STRING_FIELD_LIST[(i + 2) %
STRING_FIELD_LIST.length]);
+ row.putValue("col2", STRING_FIELD_LIST[i % (STRING_FIELD_LIST.length -
2)]);
row.putValue("col3", INT_FIELD_LIST[i % INT_FIELD_LIST.length]);
row.putValue("ts", System.currentTimeMillis());
rows.add(row);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 6ae43ce0cd..b409a4b063 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -153,11 +153,15 @@ public class QueryRunnerTest {
// Next join with table C which has (5 on server1 and 10 on server2),
since data is identical. each of the row
// of the A JOIN B will have identical value of col3 as table C.col3
has. Since the values are cycling between
// (1, 2, 42, 1, 2). we will have 6 1s, 6 2s, and 3 42s, total result
count will be 36 + 36 + 9 = 81
- new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 JOIN c ON
a.col3 = c.col3", 81},
+ new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON
a.col3 = c.col3", 81},
// Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
// thus the final JOIN result will be 15 x 1 = 15.
- new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col2", 15},
+ new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1", 15},
+
+ // Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
+ // thus the final JOIN result will be 15 x 1 = 15.
+ new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 =
b.col2", 15},
// Specifically table A has 15 rows (10 on server1 and 5 on server2)
and table B has 5 rows (all on server1),
// thus the final JOIN result will be 15 x 1 = 15.
@@ -167,7 +171,19 @@ public class QueryRunnerTest {
// but only 1 out of 5 rows from table A will be selected out; and all
in table B will be selected.
// thus the final JOIN result will be 1 x 3 x 1 = 3.
new Object[]{"SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON
a.col1 = b.col2 "
- + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 3},
+ + " WHERE a.col3 >= 0 AND a.col2 = 'alice' AND b.col3 >= 0", 3},
+
+ // Projection pushdown
+ new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0
AND a.col2 = 'alice'", 3},
+
+ // Aggregation with group by
+ new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0
GROUP BY a.col1", 5},
+
+ // Aggregation with multiple group key
+ new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3
>= 0 GROUP BY a.col1, a.col2", 5},
+
+ // Aggregation without group by
+ new Object[]{"SELECT COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 =
'alice'", 1},
};
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]