599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1047363771
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue>
iterFromMake)
);
}
+ public Sequence<ScanResultValue> processWithMultiColumnSort(
+ ScanQuery query,
+ boolean legacy,
+ boolean hasTimeout,
+ long timeoutAt,
+ StorageAdapter adapter,
+ List<String> allColumns,
+ List<Interval> intervals,
+ SegmentId segmentId,
+ Filter filter,
+ @Nullable final QueryMetrics<?> queryMetrics
+ )
+ {
+
+ int limit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ limit = Integer.MAX_VALUE;
+ } else {
+ limit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ return Sequences.concat(
+ adapter
+ .makeCursors(
+ filter,
+ intervals.get(0),
+ query.getVirtualColumns(),
+ Granularities.ALL,
+ query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+ (query.getTimeOrder().equals(ScanQuery.Order.NONE) &&
query.isDescending()),
+ queryMetrics
+ )
+ .map(cursor -> new BaseSequence<>(
+ new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
+ {
+ @Override
+ public Iterator<ScanResultValue> make()
+ {
+ final List<BaseObjectColumnValueSelector> columnSelectors
= new ArrayList<>(allColumns.size());
+
+ for (String column : allColumns) {
+ final BaseObjectColumnValueSelector selector;
+
+ if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+ selector = cursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ } else {
+ selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+ }
+
+ columnSelectors.add(selector);
+ }
+
+ return new Iterator<ScanResultValue>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (hasTimeout && System.currentTimeMillis() >=
timeoutAt) {
+ throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
+ }
+ Sorter<Object> sorter = new QueueBasedSorter<>(limit,
query.getOrderByNoneTimeResultOrdering());
+ rowsToCompactedList(sorter);
+ final List<List<Object>> sortedElements = new
ArrayList<>(sorter.size());
+ Iterators.addAll(sortedElements,
sorter.drainElement());
Review Comment:
1. I think it is simple enough to directly use a `compact list` format for
segment level sorting.
If the user's query specifies `resultFormat=RESULT_ FORMAT_LIST`, we can
convert `compact list` to `non-compact` list at the last stage of sorting.
2. I also thought about implementing batch level sorting in the past, but I
didn't think of the significance of batch level sorting. Is it to avoid too
much data after materialization, which leads to the oom?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]