paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990736577


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without 
setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an 
ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which 
should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> 
orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, 
scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = 
next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great 
memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result 
format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, 
Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) 
event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));
+          }
+        }
+        yielder = yielder.next(null);
+        doneScanning = yielder.isDone();
+      }
+      final List<ScanResultValue> sortedElements = new 
ArrayList<>(sorter.size());
+      Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   There should be a deterministic ordering of the column values in the array. 
I believe that the select list in the `ScanQuery` provides that order. This is, 
in fact, the order used in the compact array. As a result, all of the cursors 
for a single segment should have the same order of columns within the compact 
array. Within a single segment, there is no danger of one cursor having the 
column, and another not having it. That issue arises between segments, but that 
would occur during merge.
   
   So, we can map from the cursor into a "compact list" (that is, a list of 
arrays) that we combine into a single big list for sorting. When we break the 
sorted list down into batches, all the batches will also have the same column 
order within the arrays.
   
   We do have to preserve the headers: I omitted that detail from my long note. 
The sorter can hold onto the first header it sees, and reuse that header for 
all batches it creates after sorting.
   
   To verify the assertion that all arrays have the same column positions 
within the array, on the second and subsequent batches we can spin though the 
header to verify that the column order is identical to the first header.
   
   If the query requests a (non-compact) list, then we have to convert the 
array-based rows into maps. To do this, we can use the header we saved above, 
then loop through each row to create the required map. That's going to be 
inefficient, so I hope few queries ask for the list-of-maps format!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to