599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1047383221
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue>
iterFromMake)
);
}
+ public Sequence<ScanResultValue> processWithMultiColumnSort(
+ ScanQuery query,
+ boolean legacy,
+ boolean hasTimeout,
+ long timeoutAt,
+ StorageAdapter adapter,
+ List<String> allColumns,
+ List<Interval> intervals,
+ SegmentId segmentId,
+ Filter filter,
+ @Nullable final QueryMetrics<?> queryMetrics
+ )
+ {
+
+ int limit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ limit = Integer.MAX_VALUE;
+ } else {
+ limit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ return Sequences.concat(
+ adapter
+ .makeCursors(
+ filter,
+ intervals.get(0),
+ query.getVirtualColumns(),
+ Granularities.ALL,
+ query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+ (query.getTimeOrder().equals(ScanQuery.Order.NONE) &&
query.isDescending()),
+ queryMetrics
+ )
+ .map(cursor -> new BaseSequence<>(
+ new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
+ {
+ @Override
+ public Iterator<ScanResultValue> make()
+ {
+ final List<BaseObjectColumnValueSelector> columnSelectors
= new ArrayList<>(allColumns.size());
+
+ for (String column : allColumns) {
+ final BaseObjectColumnValueSelector selector;
+
+ if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+ selector = cursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ } else {
+ selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+ }
+
+ columnSelectors.add(selector);
+ }
+
+ return new Iterator<ScanResultValue>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (hasTimeout && System.currentTimeMillis() >=
timeoutAt) {
+ throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
+ }
+ Sorter<Object> sorter = new QueueBasedSorter<>(limit,
query.getOrderByNoneTimeResultOrdering());
+ rowsToCompactedList(sorter);
+ final List<List<Object>> sortedElements = new
ArrayList<>(sorter.size());
+ Iterators.addAll(sortedElements,
sorter.drainElement());
+ return new ScanResultValue(segmentId.toString(),
allColumns, sortedElements);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private void rowsToCompactedList(Sorter<Object> sorter)
+ {
+ for (; !cursor.isDone(); cursor.advance()) {
Review Comment:
Each segment corresponds to a cursor, and the number of row of the segment
is limited.
The default limit here is 65535. Use Sorter (limit) to apply the limit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]