npawar commented on a change in pull request #4602: First pass of GROUP BY with 
ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r333125496
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 ##########
 @@ -428,6 +469,217 @@ private void setAggregationResults(BrokerResponseNative 
brokerResponseNative,
     }
   }
 
+  /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param aggregationInfos aggregations info
+   * @param groupBy group by info
+   * @param orderBy order by info
+   * @param dataTableMap map from server to data table
+   */
+  private void setSQLGroupByOrderByResults(BrokerResponseNative 
brokerResponseNative, DataSchema dataSchema,
+      List<AggregationInfo> aggregationInfos, GroupBy groupBy, 
List<SelectionSort> orderBy,
+      Map<ServerInstance, DataTable> dataTableMap, boolean preserveType) {
+
+    List<String> columns = new ArrayList<>(dataSchema.size());
+    for (int i = 0; i < dataSchema.size(); i++) {
+      columns.add(dataSchema.getColumnName(i));
+    }
+
+    int numGroupBy = groupBy.getExpressionsSize();
+    int numAggregations = aggregationInfos.size();
+
+    IndexedTable indexedTable;
+    try {
+      indexedTable =
+          getIndexedTable(numGroupBy, numAggregations, groupBy, 
aggregationInfos, orderBy, dataSchema, dataTableMap);
+    } catch (Throwable throwable) {
+      throw new IllegalStateException(throwable);
+    }
+
+    List<AggregationFunction> aggregationFunctions = new 
ArrayList<>(aggregationInfos.size());
+    for (AggregationInfo aggregationInfo : aggregationInfos) {
+      aggregationFunctions.add(
+          
AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction());
+    }
+
+    List<Serializable[]> rows = new ArrayList<>();
+    int numColumns = columns.size();
+    Iterator<Record> sortedIterator = indexedTable.iterator();
+    int numRows = 0;
+    while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
+
+      Record nextRecord = sortedIterator.next();
+      Serializable[] row = new Serializable[numColumns];
+      int index = 0;
+      for (Object keyColumn : nextRecord.getKey().getColumns()) {
+        row[index ++] = getSerializableValue(keyColumn);
+      }
+      int aggNum = 0;
+      for (Object valueColumn : nextRecord.getValues()) {
+        row[index] = 
getSerializableValue(aggregationFunctions.get(aggNum).extractFinalResult(valueColumn));
+        if (preserveType) {
+          row[index] = AggregationFunctionUtils.formatValue(row[index]);
+        }
+        index ++;
+      }
+      rows.add(row);
+      numRows++;
+    }
+
+    brokerResponseNative.setResultTable(new ResultTable(columns, rows));
+  }
+
+  private IndexedTable getIndexedTable(int numGroupBy, int numAggregations, 
GroupBy groupBy,
+      List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, 
DataSchema dataSchema, Map<ServerInstance, DataTable> dataTableMap)
+      throws Throwable {
+
+    IndexedTable indexedTable = new ConcurrentIndexedTable();
+    int indexedTableCapacity = 1_000_000;
+    // FIXME: indexedTableCapacity should be derived from TOP. Hardcoding this 
value to a higher number until we can tune the resize
+    // int capacity = GroupByUtils.getTableCapacity((int) groupBy.getTopN());
+    indexedTable.init(dataSchema, aggregationInfos, orderBy, 
indexedTableCapacity);
+
+    for (DataTable dataTable : dataTableMap.values()) {
+      CheckedFunction2[] functions = new CheckedFunction2[dataSchema.size()];
 
 Review comment:
   Changed. Thanks for the suggestion. BiFunction is better

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to