npawar commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r333125496
########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java ########## @@ -428,6 +469,217 @@ private void setAggregationResults(BrokerResponseNative brokerResponseNative, } } + /** + * Extract group by order by results and set into {@link ResultTable} + * @param brokerResponseNative broker response + * @param dataSchema data schema + * @param aggregationInfos aggregations info + * @param groupBy group by info + * @param orderBy order by info + * @param dataTableMap map from server to data table + */ + private void setSQLGroupByOrderByResults(BrokerResponseNative brokerResponseNative, DataSchema dataSchema, + List<AggregationInfo> aggregationInfos, GroupBy groupBy, List<SelectionSort> orderBy, + Map<ServerInstance, DataTable> dataTableMap, boolean preserveType) { + + List<String> columns = new ArrayList<>(dataSchema.size()); + for (int i = 0; i < dataSchema.size(); i++) { + columns.add(dataSchema.getColumnName(i)); + } + + int numGroupBy = groupBy.getExpressionsSize(); + int numAggregations = aggregationInfos.size(); + + IndexedTable indexedTable; + try { + indexedTable = + getIndexedTable(numGroupBy, numAggregations, groupBy, aggregationInfos, orderBy, dataSchema, dataTableMap); + } catch (Throwable throwable) { + throw new IllegalStateException(throwable); + } + + List<AggregationFunction> aggregationFunctions = new ArrayList<>(aggregationInfos.size()); + for (AggregationInfo aggregationInfo : aggregationInfos) { + aggregationFunctions.add( + AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction()); + } + + List<Serializable[]> rows = new ArrayList<>(); + int numColumns = columns.size(); + Iterator<Record> sortedIterator = indexedTable.iterator(); + int numRows = 0; + while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) { + + Record nextRecord = sortedIterator.next(); + Serializable[] row = new Serializable[numColumns]; + int index = 0; + for (Object keyColumn : nextRecord.getKey().getColumns()) { + row[index ++] = getSerializableValue(keyColumn); + } + int aggNum = 0; + for (Object valueColumn : nextRecord.getValues()) { + row[index] = getSerializableValue(aggregationFunctions.get(aggNum).extractFinalResult(valueColumn)); + if (preserveType) { + row[index] = AggregationFunctionUtils.formatValue(row[index]); + } + index ++; + } + rows.add(row); + numRows++; + } + + brokerResponseNative.setResultTable(new ResultTable(columns, rows)); + } + + private IndexedTable getIndexedTable(int numGroupBy, int numAggregations, GroupBy groupBy, + List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, DataSchema dataSchema, Map<ServerInstance, DataTable> dataTableMap) + throws Throwable { + + IndexedTable indexedTable = new ConcurrentIndexedTable(); + int indexedTableCapacity = 1_000_000; + // FIXME: indexedTableCapacity should be derived from TOP. Hardcoding this value to a higher number until we can tune the resize + // int capacity = GroupByUtils.getTableCapacity((int) groupBy.getTopN()); + indexedTable.init(dataSchema, aggregationInfos, orderBy, indexedTableCapacity); + + for (DataTable dataTable : dataTableMap.values()) { + CheckedFunction2[] functions = new CheckedFunction2[dataSchema.size()]; Review comment: Changed. Thanks for the suggestion. BiFunction is better ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org