mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325807516
########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java ########## @@ -392,6 +434,218 @@ private void setAggregationResults(@Nonnull BrokerResponseNative brokerResponseN brokerResponseNative.setAggregationResults(reducedAggregationResults); } + /** + * Extract group by order by results and set into {@link ResultTable} + * @param brokerResponseNative broker response + * @param dataSchema data schema + * @param aggregationInfos aggregations info + * @param groupBy group by info + * @param orderBy order by info + * @param dataTableMap map from server to data table + */ + private void setSQLGroupByOrderByResults(@Nonnull BrokerResponseNative brokerResponseNative, + @Nonnull DataSchema dataSchema, @Nonnull List<AggregationInfo> aggregationInfos, @Nonnull GroupBy groupBy, + @Nonnull List<SelectionSort> orderBy, @Nonnull Map<ServerInstance, DataTable> dataTableMap, + boolean preserveType) { + + List<String> columns = new ArrayList<>(dataSchema.size()); + for (int i = 0; i < dataSchema.size(); i++) { + columns.add(dataSchema.getColumnName(i)); + } + + int numGroupBy = groupBy.getExpressionsSize(); + int numAggregations = aggregationInfos.size(); + + IndexedTable indexedTable; + try { + indexedTable = + getIndexedTable(numGroupBy, numAggregations, groupBy, aggregationInfos, orderBy, dataSchema, dataTableMap); + } catch (Throwable throwable) { + throw new IllegalStateException(throwable); + } + + List<AggregationFunction> aggregationFunctions = new ArrayList<>(aggregationInfos.size()); + for (AggregationInfo aggregationInfo : aggregationInfos) { + aggregationFunctions.add( + AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction()); + } + + List<Serializable[]> rows = new ArrayList<>(); + int numColumns = columns.size(); + Iterator<Record> sortedIterator = indexedTable.iterator(); + int numRows = 0; + while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) { + + Record nextRecord = sortedIterator.next(); + Serializable[] row = new Serializable[numColumns]; + int index = 0; + for (Object keyColumn : nextRecord.getKey().getColumns()) { + row[index ++] = getSerializableValue(keyColumn); + } + int aggNum = 0; + for (Object valueColumn : nextRecord.getValues()) { + row[index] = getSerializableValue(aggregationFunctions.get(aggNum).extractFinalResult(valueColumn)); + if (preserveType) { + row[index] = AggregationFunctionUtils.formatValue(row[index]); + } + index ++; + } + rows.add(row); + numRows++; + } + + brokerResponseNative.setResultTable(new ResultTable(columns, rows)); + } + + private IndexedTable getIndexedTable(int numGroupBy, int numAggregations, GroupBy groupBy, + List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, DataSchema dataSchema, Map<ServerInstance, DataTable> dataTableMap) + throws Throwable { + + IndexedTable indexedTable = new ConcurrentIndexedTable(); + // setting a higher value to avoid frequent resizing + int capacity = (int) Math.max(groupBy.getTopN(), 10000); + indexedTable.init(dataSchema, aggregationInfos, orderBy, capacity, true); + + for (DataTable dataTable : dataTableMap.values()) { + CheckedFunction2[] functions = new CheckedFunction2[dataSchema.size()]; + for (int i = 0; i < dataSchema.size(); i++) { + ColumnDataType columnDataType = dataSchema.getColumnDataType(i); + CheckedFunction2<Integer, Integer, Object> function; + switch (columnDataType) { + + case INT: + function = (CheckedFunction2<Integer, Integer, Object>) dataTable::getInt; + break; + case LONG: + function = (CheckedFunction2<Integer, Integer, Object>) dataTable::getLong; + break; + case FLOAT: + function = (CheckedFunction2<Integer, Integer, Object>) dataTable::getFloat; + break; + case DOUBLE: + function = (CheckedFunction2<Integer, Integer, Object>) dataTable::getDouble; + break; + case STRING: + function = (CheckedFunction2<Integer, Integer, Object>) dataTable::getString; + break; + default: + function = (CheckedFunction2<Integer, Integer, Object>) dataTable::getObject; + } + functions[i] = function; + } + + for (int row = 0; row < dataTable.getNumberOfRows(); row++) { + Object[] key = new Object[numGroupBy]; + int col = 0; + for (int j = 0; j < numGroupBy; j++) { + key[j] = functions[col].apply(row, col); + col ++; + } + Object[] value = new Object[numAggregations]; + for (int j = 0; j < numAggregations; j++) { + value[j] = functions[col].apply(row, col); + col ++; + } + Record record = new Record(new Key(key), value); + indexedTable.upsert(record); + } + } + indexedTable.finish(); + return indexedTable; + } + + /** + * Extract the results of group by order by into a List of {@link AggregationResult} + * There will be 1 aggregation result per aggregation. The group by keys will be the same across all aggregations + * @param brokerResponseNative broker response + * @param dataSchema data schema + * @param aggregationInfos aggregations info + * @param groupBy group by info + * @param orderBy order by info + * @param dataTableMap map from server to data table + */ + private void setPQLGroupByOrderByResults(@Nonnull BrokerResponseNative brokerResponseNative, Review comment: Is this needed because we want to support order-by with PQL? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org