599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990792268


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without 
setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an 
ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which 
should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> 
orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, 
scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = 
next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great 
memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result 
format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, 
Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) 
event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));
+          }
+        }
+        yielder = yielder.next(null);
+        doneScanning = yielder.isDone();
+      }
+      final List<ScanResultValue> sortedElements = new 
ArrayList<>(sorter.size());
+      Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   Your idea is feasible, I have finished changing it, thank you for your 
patience in explaining



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to