paul-rogers commented on code in PR #13031: URL: https://github.com/apache/druid/pull/13031#discussion_r985002018
########## processing/src/main/java/org/apache/druid/query/scan/OrderByLimitQueryRunner.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; +import org.apache.druid.collections.MultiColumnSorter; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +class OrderByLimitQueryRunner implements QueryRunner<ScanResultValue> Review Comment: I made an earlier comment about the complexity that results from combining sorting in with cursor handling, since cursor handling is already complex. I've been thinking about what I could suggest instead, until we get the operator-based solution working. Here's an idea. Leave the cursor layer as-is: it produces a set of rows as Java arrays. If the query runner sees that a sort is requested, insert another `Sequence` layer above the cursor. The native query stuff is a bit messy, but here's the gist. Each cursor represents one slice of time. If the sort spec starts with `__time`, then the results from each cursor can be concatenated to produce the sorted result for a segment. If the key isn't `__time`, then do a two-level sort-merge. Use the sorter from above to sort the results from each cursor. Hold on to all these sorted lists until all cursors are done. Then, merge the results using the priority queue mechanism you have in the above file. Voila!, a sorted per-segment result. Results from multiple segments must be merged, but the code to check for merges already exists. You may have to modify it to handle your custom merge. The one drawback of this, mentioned above, is memory pressure. But, since there is no good solution at present, we can start with ignoring the memory issue. This layered trick is what we've done for operators; so your solution will have all the pieces needed when we convert this code: we'll just wrap your solution in an operator interface rather than a `Sequence` interface. ########## processing/src/main/java/org/apache/druid/collections/MultiColumnSorter.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.collections; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Ordering; +import org.apache.druid.java.util.common.ISE; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +public class MultiColumnSorter<T> Review Comment: Sorting is a tricky beast! First, there are the memory issues. In the worst case, someone might sort all columns for all rows in a segment, which might create excessive memory use. Druid has no good solution at present, we'd just have to be aware of the issue. For a sort algorithm, there are many. Back in Apache Drill, we found that the Hadoop sorter worked better than the stock Java sorter. MSQ introduced a disk-based "super sorter", which is overkill here. There are probably some sorts going on in ingestion, but a quick search of the code didn't uncover anything obvious. Perhaps a message in the Druid dev Slack channel would offer some pointers. In any event, the sorter can be wrapped in an interface, since sorts are the kind of things that can always be improved. For this use case, perhaps we can assume all rows have the same schema, and are Java objects in an array, and just use the Java sort options, along with a comparator that compares columns in key order. ########## processing/src/main/java/org/apache/druid/query/scan/OrderByLimitSequence.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.java.util.common.guava.BaseSequence; +import org.apache.druid.query.QueryTimeoutException; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.timeline.SegmentId; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +class OrderByLimitSequence extends BaseSequence<ScanResultValue, Iterator<ScanResultValue>> Review Comment: Once you have the basic sort/merge working, then you can add limits. There are two cases. In the sort phase, it is necessary to sort the entire result set to apply the limit after sorting. So, you can simply plop a limit sequence on top of your order sequence, which is what we do in the operator case. Then, in the merge, you can just have one sequence do the merge, ignorant of a limit. Have a limit sequence sit on top. The limit sequence just stops reading from the merge, and closes it, when it has reached the limit. In this way, each operation is separate and simple. You don't need to have the sort and merge also know how to do the limit. It would be the query runner that works out how to stack these sequences. We'd have to work out exactly which one does the deed, but my guess is that it is the one that works with the scan engine: it just inserts extra sequences as needed for sort/merge/limit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
