walterddr commented on code in PR #11607:
URL: https://github.com/apache/pinot/pull/11607#discussion_r1329089094
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1065,4 +1066,14 @@ public enum JoinOverFlowMode {
THROW, BREAK
}
}
+
+ public static class NullValuePlaceHolder {
Review Comment:
we already have default null value in FieldSpecs do we plan to do
differently here? should we consolidate?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -117,31 +96,34 @@ public AggregateOperator(OpChainExecutionContext context,
MultiStageOperator inp
_aggCallSignatureMap = Collections.emptyMap();
}
- _hasReturnedAggregateBlock = false;
+ // Initialize the aggregation functions
_colNameToIndexMap = new HashMap<>();
-
- // Convert groupSet to ExpressionContext that our aggregation functions
understand.
- List<ExpressionContext> groupByExpr = getGroupSet(groupSet);
-
List<FunctionContext> functionContexts = getFunctionContexts(aggCalls);
- AggregationFunction[] aggFunctions = new
AggregationFunction[functionContexts.size()];
-
- for (int i = 0; i < functionContexts.size(); i++) {
+ int numFunctions = functionContexts.size();
+ AggregationFunction<?, ?>[] aggFunctions = new
AggregationFunction[numFunctions];
+ for (int i = 0; i < numFunctions; i++) {
aggFunctions[i] =
AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i),
true);
}
+ // Process the filter argument indices
+ int[] filterArgIds = new int[numFunctions];
+ int maxFilterArgId = -1;
Review Comment:
why did we need this extra integer? isnt the filterArgIds array null/empty
indicate theres no filter?
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -79,7 +79,7 @@ public void shouldHandleUpstreamErrorBlocks() {
DataSchema outSchema = new DataSchema(new String[]{"group", "sum"}, new
ColumnDataType[]{INT, DOUBLE});
AggregateOperator operator =
new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input,
outSchema, inSchema, calls, group,
- AggType.INTERMEDIATE, null, null);
+ AggType.DIRECT, Collections.singletonList(-1), null);
Review Comment:
any specific reason we change the AggType here?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -325,57 +291,126 @@ private ExpressionContext
convertRexExpressionToExpressionContext(int aggIdx, in
return exprContext;
}
- // TODO: If the previous block is not mailbox received, this method is not
efficient. Then getDataBlock() will
- // convert the unserialized format to serialized format of BaseDataBlock.
Then it will convert it back to column
- // value primitive type.
- static Map<ExpressionContext, BlockValSet>
getBlockValSetMap(AggregationFunction aggFunction, TransferableBlock block,
- DataSchema inputDataSchema, Map<String, Integer> colNameToIndexMap, int
filterArgIdx) {
+ private int[] getGroupKeyIds(List<RexExpression> groupSet) {
+ int numKeys = groupSet.size();
+ int[] groupKeyIds = new int[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ RexExpression rexExp = groupSet.get(i);
+ Preconditions.checkState(rexExp.getKind() == SqlKind.INPUT_REF, "Group
key must be an input reference, got: %s",
+ rexExp.getKind());
+ groupKeyIds[i] = ((RexExpression.InputRef) rexExp).getIndex();
+ }
+ return groupKeyIds;
+ }
+
+ static RoaringBitmap getMatchedBitmap(TransferableBlock block, int
filterArgId) {
+ Preconditions.checkArgument(filterArgId >= 0, "Got negative filter
argument id: %s", filterArgId);
+ RoaringBitmap matchedBitmap = new RoaringBitmap();
+ if (block.isContainerConstructed()) {
+ List<Object[]> rows = block.getContainer();
+ int numRows = rows.size();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if ((int) rows.get(rowId)[filterArgId] == 1) {
+ matchedBitmap.add(rowId);
+ }
+ }
+ } else {
+ DataBlock dataBlock = block.getDataBlock();
+ int numRows = dataBlock.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (dataBlock.getInt(rowId, filterArgId) == 1) {
+ matchedBitmap.add(rowId);
+ }
+ }
+ }
+ return matchedBitmap;
+ }
+
+ static Map<ExpressionContext, BlockValSet>
getBlockValSetMap(AggregationFunction<?, ?> aggFunction,
+ TransferableBlock block, Map<String, Integer> colNameToIndexMap) {
List<ExpressionContext> expressions = aggFunction.getInputExpressions();
int numExpressions = expressions.size();
if (numExpressions == 0) {
return Collections.emptyMap();
}
-
Map<ExpressionContext, BlockValSet> blockValSetMap = new HashMap<>();
- for (ExpressionContext expression : expressions) {
- if (expression.getType().equals(ExpressionContext.Type.IDENTIFIER) &&
!"__PLACEHOLDER__".equals(
Review Comment:
i was originally intended to get rid of the `__PLACEHOLDER__`. let's factor
this into a util or leave a TODO so that it's easier to replace in the future
(i see 4 __PLACEHOLDER__ usage in the new impl)
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -325,57 +291,126 @@ private ExpressionContext
convertRexExpressionToExpressionContext(int aggIdx, in
return exprContext;
}
- // TODO: If the previous block is not mailbox received, this method is not
efficient. Then getDataBlock() will
- // convert the unserialized format to serialized format of BaseDataBlock.
Then it will convert it back to column
- // value primitive type.
- static Map<ExpressionContext, BlockValSet>
getBlockValSetMap(AggregationFunction aggFunction, TransferableBlock block,
- DataSchema inputDataSchema, Map<String, Integer> colNameToIndexMap, int
filterArgIdx) {
+ private int[] getGroupKeyIds(List<RexExpression> groupSet) {
Review Comment:
nit: why reordering functions? this is original getGroupSet right? (and if
not cant we remove the `convertRexExpressionToExpressionContext` func)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]