paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1046251965
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -445,6 +432,63 @@ public Ordering<ScanResultValue> getResultOrdering()
);
}
+ public List<Integer> getSortColumnIdxs()
Review Comment:
Nit: Idxs -> Indexes
Druid likes to avoid abbreviations (hence ourReallyLongVariableNames.)
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -445,6 +432,63 @@ public Ordering<ScanResultValue> getResultOrdering()
);
}
+ public List<Integer> getSortColumnIdxs()
+ {
+ List<String> allColumns;
+ if (legacy &&
!getColumns().contains(ScanQueryEngine.LEGACY_TIMESTAMP_KEY)) {
+ allColumns = new ArrayList<>(getColumns().size() + 1);
+ allColumns.add(ScanQueryEngine.LEGACY_TIMESTAMP_KEY);
+ } else {
+ allColumns = new ArrayList<>(getColumns().size());
+ }
+
+ allColumns.addAll(getColumns());
+ return getOrderBys()
+ .stream()
+ .map(orderBy -> allColumns.indexOf(orderBy.getColumnName()))
+ .collect(Collectors.toList());
+ }
+
+ public Ordering<List<Object>> getOrderByNoneTimeResultOrdering()
Review Comment:
Nit: improve the method name. Is this saying to get the ordering in the case
where we're not ordering by __time? If so, maybe `getGenericResultOrdering()`
with the time being the specialized case.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue>
iterFromMake)
);
}
+ public Sequence<ScanResultValue> processWithMultiColumnSort(
+ ScanQuery query,
+ boolean legacy,
+ boolean hasTimeout,
+ long timeoutAt,
+ StorageAdapter adapter,
+ List<String> allColumns,
+ List<Interval> intervals,
+ SegmentId segmentId,
+ Filter filter,
+ @Nullable final QueryMetrics<?> queryMetrics
+ )
+ {
+
+ int limit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ limit = Integer.MAX_VALUE;
+ } else {
+ limit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ return Sequences.concat(
+ adapter
+ .makeCursors(
+ filter,
+ intervals.get(0),
+ query.getVirtualColumns(),
+ Granularities.ALL,
+ query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+ (query.getTimeOrder().equals(ScanQuery.Order.NONE) &&
query.isDescending()),
+ queryMetrics
+ )
+ .map(cursor -> new BaseSequence<>(
+ new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
+ {
+ @Override
+ public Iterator<ScanResultValue> make()
+ {
+ final List<BaseObjectColumnValueSelector> columnSelectors
= new ArrayList<>(allColumns.size());
+
+ for (String column : allColumns) {
+ final BaseObjectColumnValueSelector selector;
+
+ if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+ selector = cursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ } else {
+ selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+ }
+
+ columnSelectors.add(selector);
+ }
+
+ return new Iterator<ScanResultValue>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (hasTimeout && System.currentTimeMillis() >=
timeoutAt) {
+ throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
+ }
+ Sorter<Object> sorter = new QueueBasedSorter<>(limit,
query.getOrderByNoneTimeResultOrdering());
+ rowsToCompactedList(sorter);
+ final List<List<Object>> sortedElements = new
ArrayList<>(sorter.size());
+ Iterators.addAll(sortedElements,
sorter.drainElement());
+ return new ScanResultValue(segmentId.toString(),
allColumns, sortedElements);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private void rowsToCompactedList(Sorter<Object> sorter)
Review Comment:
This isn't really a `rowsToCompactedList` is it? More like a `rowsToSorter`?
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -496,6 +540,30 @@ public ScanQuery withOverriddenContext(Map<String, Object>
contextOverrides)
return
Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(),
contextOverrides)).build();
}
+ /**
+ * Report whether the sort can be pushed into the Cursor, or must be done as
a
+ * separate sort step.
+ */
+ public boolean canPushSort()
Review Comment:
Nit: `ScanQuery` is a public class. The public API is cleaner if we separate
out implementation stuff from query definition stuff. Suggestion, move these
new methods elsewhere, say a `ScanQueries` class or on one of the
implementation classes.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue>
iterFromMake)
);
}
+ public Sequence<ScanResultValue> processWithMultiColumnSort(
+ ScanQuery query,
+ boolean legacy,
+ boolean hasTimeout,
+ long timeoutAt,
+ StorageAdapter adapter,
+ List<String> allColumns,
+ List<Interval> intervals,
+ SegmentId segmentId,
+ Filter filter,
+ @Nullable final QueryMetrics<?> queryMetrics
+ )
+ {
+
+ int limit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ limit = Integer.MAX_VALUE;
+ } else {
+ limit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ return Sequences.concat(
+ adapter
+ .makeCursors(
+ filter,
+ intervals.get(0),
+ query.getVirtualColumns(),
+ Granularities.ALL,
+ query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+ (query.getTimeOrder().equals(ScanQuery.Order.NONE) &&
query.isDescending()),
+ queryMetrics
+ )
+ .map(cursor -> new BaseSequence<>(
+ new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
+ {
+ @Override
+ public Iterator<ScanResultValue> make()
+ {
+ final List<BaseObjectColumnValueSelector> columnSelectors
= new ArrayList<>(allColumns.size());
+
+ for (String column : allColumns) {
+ final BaseObjectColumnValueSelector selector;
+
+ if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+ selector = cursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ } else {
+ selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+ }
+
+ columnSelectors.add(selector);
+ }
+
+ return new Iterator<ScanResultValue>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (hasTimeout && System.currentTimeMillis() >=
timeoutAt) {
+ throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
+ }
+ Sorter<Object> sorter = new QueueBasedSorter<>(limit,
query.getOrderByNoneTimeResultOrdering());
+ rowsToCompactedList(sorter);
+ final List<List<Object>> sortedElements = new
ArrayList<>(sorter.size());
+ Iterators.addAll(sortedElements,
sorter.drainElement());
+ return new ScanResultValue(segmentId.toString(),
allColumns, sortedElements);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private void rowsToCompactedList(Sorter<Object> sorter)
+ {
+ for (; !cursor.isDone(); cursor.advance()) {
Review Comment:
We want to apply a limit. Yet, is unlimited. Are we relying on the cursor to
apply the limit?
A scan query can return multiple cursors (one for each time chunk, I
believe; I'm a bit unclear on the details.) In that case, the limit has to be
adjusted for each; or we need to enforce an outer limit here.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue>
iterFromMake)
);
}
+ public Sequence<ScanResultValue> processWithMultiColumnSort(
+ ScanQuery query,
+ boolean legacy,
+ boolean hasTimeout,
+ long timeoutAt,
+ StorageAdapter adapter,
+ List<String> allColumns,
+ List<Interval> intervals,
+ SegmentId segmentId,
+ Filter filter,
+ @Nullable final QueryMetrics<?> queryMetrics
+ )
+ {
+
+ int limit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ limit = Integer.MAX_VALUE;
+ } else {
+ limit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ return Sequences.concat(
+ adapter
+ .makeCursors(
+ filter,
+ intervals.get(0),
+ query.getVirtualColumns(),
+ Granularities.ALL,
+ query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+ (query.getTimeOrder().equals(ScanQuery.Order.NONE) &&
query.isDescending()),
+ queryMetrics
+ )
+ .map(cursor -> new BaseSequence<>(
+ new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
+ {
+ @Override
+ public Iterator<ScanResultValue> make()
+ {
+ final List<BaseObjectColumnValueSelector> columnSelectors
= new ArrayList<>(allColumns.size());
+
+ for (String column : allColumns) {
+ final BaseObjectColumnValueSelector selector;
+
+ if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+ selector = cursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ } else {
+ selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+ }
+
+ columnSelectors.add(selector);
+ }
+
+ return new Iterator<ScanResultValue>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (hasTimeout && System.currentTimeMillis() >=
timeoutAt) {
+ throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
+ }
+ Sorter<Object> sorter = new QueueBasedSorter<>(limit,
query.getOrderByNoneTimeResultOrdering());
+ rowsToCompactedList(sorter);
+ final List<List<Object>> sortedElements = new
ArrayList<>(sorter.size());
+ Iterators.addAll(sortedElements,
sorter.drainElement());
+ return new ScanResultValue(segmentId.toString(),
allColumns, sortedElements);
+ }
+
+ @Override
+ public void remove()
Review Comment:
This method has had a default implementation in Java for quite a few years
now, as it turns out.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderBySequence.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.context.ResponseContext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Scan the segments in parallel, complete the sorting of each batch within
each segment, and then complete the sorting of each segment level
+ */
+public class ScanQueryOrderBySequence extends BaseSequence<ScanResultValue,
Iterator<ScanResultValue>>
+{
+ public ScanQueryOrderBySequence(
+ QueryPlus<ScanResultValue> queryPlus,
+ QueryProcessingPool queryProcessingPool,
+ Iterable<QueryRunner<ScanResultValue>> queryables,
+ ResponseContext responseContext
+ )
+ {
+ super(new ScanQueryOrderByIteratorMaker(queryPlus, queryProcessingPool,
queryables, responseContext));
+ }
+
+ static class ScanQueryOrderByIteratorMaker
+ implements BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>
+ {
+ private static final Logger log = new
Logger(ScanQueryOrderByIteratorMaker.class);
+ private final QueryPlus<ScanResultValue> threadSafeQueryPlus;
+ private final ResponseContext responseContext;
+ private final Iterable<QueryRunner<ScanResultValue>> queryables;
+ private final QueryProcessingPool queryProcessingPool;
+
+ ScanQueryOrderByIteratorMaker(
+ QueryPlus<ScanResultValue> queryPlus,
+ QueryProcessingPool queryProcessingPool,
+ Iterable<QueryRunner<ScanResultValue>> queryables,
+ ResponseContext responseContext
+ )
+ {
+ this.threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
+ this.responseContext = responseContext;
+ this.queryables = queryables;
+ this.queryProcessingPool = queryProcessingPool;
+ }
+
+ @Override
+ public Iterator<ScanResultValue> make()
+ {
+ ScanQuery query = (ScanQuery) threadSafeQueryPlus.getQuery();
+ QueryContext queryContext = QueryContext.of(query.getContext());
+ List<ListenableFuture<ScanResultValue>> futures =
+ Lists.newArrayList(
+ Iterables.transform(
+ queryables,
+ input -> {
+ if (input == null) {
+ throw new ISE("Null queryRunner! Looks to be some
segment unmapping action happening");
+ }
+
+ return queryProcessingPool.submitRunnerTask(
+ new
AbstractPrioritizedQueryRunnerCallable<ScanResultValue, ScanResultValue>(
+ queryContext.getPriority(),
+ input
+ )
+ {
+ @Override
+ public ScanResultValue call()
+ {
+ try {
+ Sequence<ScanResultValue> result =
input.run(threadSafeQueryPlus, responseContext);
+ if (result == null) {
+ throw new ISE("Got a null result! Segments are
missing!");
+ }
+
+ Iterator<ScanResultValue> it =
result.toList().iterator();
+ List<List<Object>> eventList = new ArrayList<>();
+ List<String> columns = new ArrayList<>();
+ String segmentId = null;
+ while (it.hasNext()) {
Review Comment:
Maybe a comment to explain what we're doing? Looks like we read a set of
scan result values and combine their rows into one bug scan result value.
Again, the idea with scan query is to limit batch size. Does something else
where break the results back into batches? Or, maybe copies these values into a
sorter? If copy into a sorter, should that be done here, then later merge the
results?
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -66,7 +72,12 @@ public Sequence<ScanResultValue> process(
@Nullable final QueryMetrics<?> queryMetrics
)
{
- if (segment.asQueryableIndex() != null &&
segment.asQueryableIndex().isFromTombstone()) {
+ QueryableIndex queryableIndex = segment.asQueryableIndex();
+ if (!query.canPushSort() && Objects.nonNull(queryableIndex)) {
+ return new QueryableIndexOrderbyRunner().process(query, segment,
responseContext, queryMetrics, queryableIndex);
+ }
+
+ if (queryableIndex != null && queryableIndex.isFromTombstone()) {
Review Comment:
Probably want to do this first? There is nothing to sort in a tombstone.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -496,6 +540,30 @@ public ScanQuery withOverriddenContext(Map<String, Object>
contextOverrides)
return
Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(),
contextOverrides)).build();
}
+ /**
+ * Report whether the sort can be pushed into the Cursor, or must be done as
a
+ * separate sort step.
+ */
+ public boolean canPushSort()
+ {
+ // Can push non-existent sort.
+ if (orderBys.size() == 0) {
+ return true;
+ }
+ // Cursor can sort by only one column.
+ if (orderBys.size() > 1) {
+ return false;
+ }
+ // Inline datasources can't sort
+ if (getDataSource() instanceof InlineDataSource) {
+ return false;
+ }
+ // Cursor can't sort by the __time column
+ return
ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName());
Review Comment:
I thought the cursor _can_ sort by __time (since that's how the data is
organize), but not by anything else?
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -445,6 +432,63 @@ public Ordering<ScanResultValue> getResultOrdering()
);
}
+ public List<Integer> getSortColumnIdxs()
+ {
+ List<String> allColumns;
+ if (legacy &&
!getColumns().contains(ScanQueryEngine.LEGACY_TIMESTAMP_KEY)) {
+ allColumns = new ArrayList<>(getColumns().size() + 1);
+ allColumns.add(ScanQueryEngine.LEGACY_TIMESTAMP_KEY);
+ } else {
+ allColumns = new ArrayList<>(getColumns().size());
+ }
+
+ allColumns.addAll(getColumns());
+ return getOrderBys()
+ .stream()
+ .map(orderBy -> allColumns.indexOf(orderBy.getColumnName()))
Review Comment:
Here, has some prior code ensured that the order by contains only column
names, and that each column name is projected? Else, we'll have -1 values in
the list.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByMergeRowIterator.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.utils.CollectionUtils;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByMergeRowIterator extends ScanQueryLimitRowIterator
+{
+
+ public ScanQueryOrderByMergeRowIterator(
+ QueryRunner<ScanResultValue> baseRunner,
+ QueryPlus<ScanResultValue> queryPlus,
+ ResponseContext responseContext
+ )
+ {
+ super(baseRunner, queryPlus, responseContext);
+ if
(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
+ throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is
not supported yet");
+ }
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !yielder.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+
+ final int scanRowsLimit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ scanRowsLimit = Integer.MAX_VALUE;
+ } else {
+ scanRowsLimit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ List<String> orderByDirection = query.getOrderBys()
+ .stream()
+ .map(orderBy ->
orderBy.getOrder().toString())
+ .collect(Collectors.toList());
+ List<Integer> sortColumnIdxs = query.getSortColumnIdxs();
+
+ Ordering<Comparable>[] orderings = new Ordering[orderByDirection.size()];
Review Comment:
In an ideal world, this code wouldn't be copy/pasted here and above. Maybe
have one copy, in that "scan query utils" class: `ScanQueries` or
`ScanQueryUtils`.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue>
iterFromMake)
);
}
+ public Sequence<ScanResultValue> processWithMultiColumnSort(
+ ScanQuery query,
+ boolean legacy,
+ boolean hasTimeout,
+ long timeoutAt,
+ StorageAdapter adapter,
+ List<String> allColumns,
+ List<Interval> intervals,
+ SegmentId segmentId,
+ Filter filter,
+ @Nullable final QueryMetrics<?> queryMetrics
+ )
+ {
+
+ int limit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ limit = Integer.MAX_VALUE;
+ } else {
+ limit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ return Sequences.concat(
+ adapter
+ .makeCursors(
+ filter,
+ intervals.get(0),
+ query.getVirtualColumns(),
+ Granularities.ALL,
+ query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+ (query.getTimeOrder().equals(ScanQuery.Order.NONE) &&
query.isDescending()),
+ queryMetrics
+ )
+ .map(cursor -> new BaseSequence<>(
+ new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
+ {
+ @Override
+ public Iterator<ScanResultValue> make()
+ {
+ final List<BaseObjectColumnValueSelector> columnSelectors
= new ArrayList<>(allColumns.size());
+
+ for (String column : allColumns) {
+ final BaseObjectColumnValueSelector selector;
+
+ if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+ selector = cursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ } else {
+ selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+ }
+
+ columnSelectors.add(selector);
+ }
+
+ return new Iterator<ScanResultValue>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (hasTimeout && System.currentTimeMillis() >=
timeoutAt) {
+ throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
+ }
+ Sorter<Object> sorter = new QueueBasedSorter<>(limit,
query.getOrderByNoneTimeResultOrdering());
+ rowsToCompactedList(sorter);
+ final List<List<Object>> sortedElements = new
ArrayList<>(sorter.size());
+ Iterators.addAll(sortedElements,
sorter.drainElement());
+ return new ScanResultValue(segmentId.toString(),
allColumns, sortedElements);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private void rowsToCompactedList(Sorter<Object> sorter)
+ {
+ for (; !cursor.isDone(); cursor.advance()) {
+ final List<Object> theEvent = new
ArrayList<>(allColumns.size());
Review Comment:
We're buffering the values. Using a list of objects is rather expensive. We
know the column count. Perhaps we can use a simple array as our buffered row
value. Translate that value to either a list or map when creating the scan
result values.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -445,6 +432,63 @@ public Ordering<ScanResultValue> getResultOrdering()
);
}
+ public List<Integer> getSortColumnIdxs()
+ {
+ List<String> allColumns;
+ if (legacy &&
!getColumns().contains(ScanQueryEngine.LEGACY_TIMESTAMP_KEY)) {
+ allColumns = new ArrayList<>(getColumns().size() + 1);
+ allColumns.add(ScanQueryEngine.LEGACY_TIMESTAMP_KEY);
+ } else {
+ allColumns = new ArrayList<>(getColumns().size());
+ }
+
+ allColumns.addAll(getColumns());
+ return getOrderBys()
+ .stream()
+ .map(orderBy -> allColumns.indexOf(orderBy.getColumnName()))
+ .collect(Collectors.toList());
+ }
+
+ public Ordering<List<Object>> getOrderByNoneTimeResultOrdering()
+ {
+ List<String> orderByDirection = getOrderBys().stream()
+ .map(orderBy ->
orderBy.getOrder().toString())
+ .collect(Collectors.toList());
+
+
+ Ordering<Comparable>[] orderings = new Ordering[orderByDirection.size()];
+ for (int i = 0; i < orderByDirection.size(); i++) {
+ orderings[i] =
ScanQuery.Order.ASCENDING.equals(ScanQuery.Order.fromString(orderByDirection.get(i)))
+ ? Comparators.naturalNullsFirst()
+ : Comparators.<Comparable>naturalNullsFirst().reverse();
+ }
+
+ Comparator<List<Object>> comparator = new Comparator<List<Object>>()
+ {
+
+ List<Integer> sortColumnIdxs = getSortColumnIdxs();
+
+ @Override
+ public int compare(
+ List<Object> o1,
+ List<Object> o2
+ )
+ {
+ for (int i = 0; i < sortColumnIdxs.size(); i++) {
+ int compare = orderings[i].compare(
+ (Comparable) o1.get(sortColumnIdxs.get(i)),
+ (Comparable) o2.get(sortColumnIdxs.get(i))
Review Comment:
Nit: since this is an inner loop, optimize a bit. Store the indexes in an
array (not list). Compute the list once per run, not once per comparison.
Then, get the index once per loop pass, not twice.
That is, precompute as much as possible.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderBySequence.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.context.ResponseContext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Scan the segments in parallel, complete the sorting of each batch within
each segment, and then complete the sorting of each segment level
+ */
Review Comment:
Let's think about this.
Druid already handles parallel execution of segment scans within a
historical. Are we redoing that here? Or, did the previous code not actually
run in parallel?
We can't run _cursors_ in parallel because they scan the same segment: in a
low-memory condition, we might thrash the OS cache as we try to load all the
segment slices in memory at the same time.
Druid's concurrency model is to run a limited number of threads, with those
threads picking up tasks ready to run. If we have, say, 20 cursors for a
segment, we'll have 20 tasks competing a for a small number of threads. Will
the extra complexity actually be faster than running the cursors in sequence?
Would be great to explain the parallelism model we're implementing to answer
those questions for future readers.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue>
iterFromMake)
);
}
+ public Sequence<ScanResultValue> processWithMultiColumnSort(
+ ScanQuery query,
+ boolean legacy,
+ boolean hasTimeout,
+ long timeoutAt,
+ StorageAdapter adapter,
+ List<String> allColumns,
+ List<Interval> intervals,
+ SegmentId segmentId,
+ Filter filter,
+ @Nullable final QueryMetrics<?> queryMetrics
+ )
+ {
+
+ int limit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ limit = Integer.MAX_VALUE;
+ } else {
+ limit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ return Sequences.concat(
+ adapter
+ .makeCursors(
+ filter,
+ intervals.get(0),
+ query.getVirtualColumns(),
+ Granularities.ALL,
+ query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+ (query.getTimeOrder().equals(ScanQuery.Order.NONE) &&
query.isDescending()),
+ queryMetrics
+ )
+ .map(cursor -> new BaseSequence<>(
+ new BaseSequence.IteratorMaker<ScanResultValue,
Iterator<ScanResultValue>>()
+ {
+ @Override
+ public Iterator<ScanResultValue> make()
+ {
+ final List<BaseObjectColumnValueSelector> columnSelectors
= new ArrayList<>(allColumns.size());
+
+ for (String column : allColumns) {
+ final BaseObjectColumnValueSelector selector;
+
+ if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+ selector = cursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ } else {
+ selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+ }
+
+ columnSelectors.add(selector);
+ }
+
+ return new Iterator<ScanResultValue>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (hasTimeout && System.currentTimeMillis() >=
timeoutAt) {
+ throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
+ }
+ Sorter<Object> sorter = new QueueBasedSorter<>(limit,
query.getOrderByNoneTimeResultOrdering());
+ rowsToCompactedList(sorter);
+ final List<List<Object>> sortedElements = new
ArrayList<>(sorter.size());
+ Iterators.addAll(sortedElements,
sorter.drainElement());
Review Comment:
Two items. First, Scan query supports two internal formats: a list (really,
a map) and a compact list. We probably need to support both as the output
format for the scan result value. Unless someone can confirm that we never use
the non-compact list format any longer.
Second, scan query has a batch size; When returning scan result values, we
need to divide up the results into batches no larger than the limit. This means
your iterator will be called more than once, if the row count exceeds the batch
size.
Although Druid loves deeply nested inner classes, your reviewers would
appreciate if such complex classes were converted to top-level classes: either
in their own files, or as a nested, named class. The usual arguments against
excessive code complexity apply.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByMergeRowIterator.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.utils.CollectionUtils;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByMergeRowIterator extends ScanQueryLimitRowIterator
+{
+
+ public ScanQueryOrderByMergeRowIterator(
+ QueryRunner<ScanResultValue> baseRunner,
+ QueryPlus<ScanResultValue> queryPlus,
+ ResponseContext responseContext
+ )
+ {
+ super(baseRunner, queryPlus, responseContext);
+ if
(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
+ throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is
not supported yet");
+ }
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !yielder.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+
+ final int scanRowsLimit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ scanRowsLimit = Integer.MAX_VALUE;
+ } else {
+ scanRowsLimit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ List<String> orderByDirection = query.getOrderBys()
+ .stream()
+ .map(orderBy ->
orderBy.getOrder().toString())
+ .collect(Collectors.toList());
+ List<Integer> sortColumnIdxs = query.getSortColumnIdxs();
+
+ Ordering<Comparable>[] orderings = new Ordering[orderByDirection.size()];
+ for (int i = 0; i < orderByDirection.size(); i++) {
+ orderings[i] =
ScanQuery.Order.ASCENDING.equals(ScanQuery.Order.fromString(orderByDirection.get(i)))
+ ? Comparators.naturalNullsFirst()
+ : Comparators.<Comparable>naturalNullsFirst().reverse();
+ }
+
+ Comparator<List<Object>> comparator = (o1, o2) -> {
+ for (int i = 0; i < sortColumnIdxs.size(); i++) {
+ int compare = orderings[i].compare(
+ (Comparable) o1.get(sortColumnIdxs.get(i)),
+ (Comparable) o2.get(sortColumnIdxs.get(i))
+ );
+ if (compare != 0) {
+ return compare;
+ }
+ }
+ return 0;
+ };
+ Sorter<Object> sorter = new QueueBasedSorter<Object>(scanRowsLimit,
comparator);
+ List<String> columns = new ArrayList<>();
+ while (!yielder.isDone()) {
+ ScanResultValue srv = yielder.get();
+ columns = columns.isEmpty() ? srv.getColumns() : columns;
+ List<List<Object>> events = (List<List<Object>>) srv.getEvents();
+ for (Object event : events) {
+ if (event instanceof LinkedHashMap) {
Review Comment:
This is not ideal. The scan result value carries a format type: list or
compact list. A list is a list of maps. A compact list is a list of lists. All
incoming result values will be one or the other (depending on a setting in the
query). This gives us two choices:
* Load all items in their list or map forms, sort using that form, and
recreate outgoing batches, or
* Convert all items to one format (a list say), sort, then convert back (to
a map, if needed).
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByMergeRowIterator.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.utils.CollectionUtils;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByMergeRowIterator extends ScanQueryLimitRowIterator
+{
+
+ public ScanQueryOrderByMergeRowIterator(
+ QueryRunner<ScanResultValue> baseRunner,
+ QueryPlus<ScanResultValue> queryPlus,
+ ResponseContext responseContext
+ )
+ {
+ super(baseRunner, queryPlus, responseContext);
+ if
(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
+ throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is
not supported yet");
+ }
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !yielder.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+
+ final int scanRowsLimit;
+ if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+ scanRowsLimit = Integer.MAX_VALUE;
+ } else {
+ scanRowsLimit = Math.toIntExact(query.getScanRowsLimit());
+ }
+
+ List<String> orderByDirection = query.getOrderBys()
+ .stream()
+ .map(orderBy ->
orderBy.getOrder().toString())
+ .collect(Collectors.toList());
+ List<Integer> sortColumnIdxs = query.getSortColumnIdxs();
+
+ Ordering<Comparable>[] orderings = new Ordering[orderByDirection.size()];
+ for (int i = 0; i < orderByDirection.size(); i++) {
+ orderings[i] =
ScanQuery.Order.ASCENDING.equals(ScanQuery.Order.fromString(orderByDirection.get(i)))
+ ? Comparators.naturalNullsFirst()
+ : Comparators.<Comparable>naturalNullsFirst().reverse();
+ }
+
+ Comparator<List<Object>> comparator = (o1, o2) -> {
+ for (int i = 0; i < sortColumnIdxs.size(); i++) {
+ int compare = orderings[i].compare(
+ (Comparable) o1.get(sortColumnIdxs.get(i)),
+ (Comparable) o2.get(sortColumnIdxs.get(i))
+ );
+ if (compare != 0) {
+ return compare;
+ }
+ }
+ return 0;
+ };
+ Sorter<Object> sorter = new QueueBasedSorter<Object>(scanRowsLimit,
comparator);
+ List<String> columns = new ArrayList<>();
+ while (!yielder.isDone()) {
+ ScanResultValue srv = yielder.get();
+ columns = columns.isEmpty() ? srv.getColumns() : columns;
+ List<List<Object>> events = (List<List<Object>>) srv.getEvents();
+ for (Object event : events) {
+ if (event instanceof LinkedHashMap) {
+ sorter.add(Arrays.asList(((LinkedHashMap)
event).values().toArray()));
+ } else {
+ sorter.add((List<Object>) event);
+ }
+ }
+ yielder = yielder.next(null);
+ count++;
+ }
+ final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());
Review Comment:
As in the earlier note: a scan result value has a maximum batch size. We
have to deliver results in batches no larger than that size.
Since "deliver of limited size batches from a sorter" is common between the
merge and scan engine steps, it would be great if we could factor out a common
implementation. That way, I don't have to comment on basically the same code
twice. DRY and all that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]