paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990737548


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without 
setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an 
ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which 
should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> 
orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, 
scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = 
next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great 
memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result 
format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, 
Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) 
event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));

Review Comment:
   When sorting a big list, we want the most efficient per-row storage format 
available. By storing rows as arrays, rather than wrapping the rows in another 
class, we have a more compact representation: something that is handy if we 
have to sort the full five million rows in a typical segment.
   
   The `ScanResultValue` notion is an option. Each `ScanResultValue` contains a 
batch of rows. To sort the full result set, one solution is to copy all values 
into one big array, which is what my prior notes assumed. The other way, that 
you might have in mind, is to sort each ScanResultValue, then merge the sorted 
batches to get the final results. That can work: it might be more efficient 
than copy the rows into the big array. We'd still store all the rows, but not 
in one big array.
   
   The question here is the size of the merge. I think each batch in 
`ScanResultValue` defaults to around 20K rows. If we have to sort a full 
segment, we'd have 5 million rows, or 5M / 20K = 250 batches. This would result 
in a pretty wide merge. Of course, most queries will return much smaller result 
sets.
   
   In any event, whichever choice is selected can be encapsulated in the sorter 
class described earlier.
   
   In fact, to be really fancy, we can defer the decision until all results 
arrive. If there is only one batch: sort that and return it (possibly limited). 
If there are less than, say, 10 or 20, sort each and merge. If more, combine 
batches until we get down to 10 to 20 (runs), sort each run, and return the 
results.
   
   Once we have more than two batches, however, we have to create new output 
batches with the sorted results from multiple batches in sorted order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to