[
https://issues.apache.org/jira/browse/ARROW-17183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570793#comment-17570793
]
Jeroen van Straten commented on ARROW-17183:
--------------------------------------------
IMO the problem is bigger than just sort/fetch: many of Substrait's relations
are expected to maintain ordering, and based on what I'm seeing here, I'm not
sure Acero can currently fulfill that requirement. This is just the opinion of
a predominantly-Substrait guy looking in, though; please forgive any
misunderstandings about what Acero currently supports and guarantees vs. what
it doesn't support or guarantee.
Imagine a query like {{{}SELECT some_complex_operation(col2) FROM table ORDER
BY col1 LIMIT 100{}}}. You could naively order this as {{read -> project ->
sort+fetch}} and get the result in the expected order. But an optimizer would
almost certainly turn it into {{{}read -> sort+fetch -> project{}}}, which it
can do because in Substrait, projections are required to maintain order. If
Acero's ProjectNode doesn't provide this guarantee, this optimization would
change the result.
Furthermore, while I don't think the spec states this explicitly, I would
expect {{SELECT * FROM table LIMIT 100}} to return the *first* 100 rows, and
{{SELECT * FROM table LIMIT 100 OFFSET 100}} to return the *next* 100 rows; I
wouldn't expect a random selection of 100 rows from either. Likewise, if I
would use Substrait + Acero to convert one file format to another and remove
some columns for example, I would expect the output to be in the same order.
IMO, we can't assert that we really support Substrait if we don't meet these
expectations. Compiler analogy: if I switch to a different C++ compiler, I
would expect my std::vectors to still be contiguous in memory, and not suddenly
be ChunkedArrays because those are faster in some cases. The C++ spec says they
have to be contiguous, full stop.
We could propose adding options to Substrait to allow arbitrary order in
various places, and then reject plans that don't have that option set if we
can't fulfill the default behavior. This feels like a relatively Arrow-specific
thing to add, though, so my Substrait POV response would be to use extensions
instead... but that would mean we wouldn't support plain Substrait, and that
seems like a less-than-ideal outcome.
Instead, I propose we actually implement support for maintaining order. I had
the following idea for how we could do that with minimal impact to Acero:
- For all source nodes, add an option to include a uint64 row index/sequence
number column, through which row order can be recovered.
- For all sink nodes, add an option to order by a uint64 column (my
understanding is that this exists already). Likewise for any other
order-sensitive nodes if ever they're added.
- Implement a SortAndFetch node that orders the dataset by some rich sort
information (including comparator function, multiple columns, etc), optionally
only returns part of it, and optionally includes a new row index/sequence
column such that its computed ordering can be recovered later. I believe this
is sufficient to capture all corner cases; FetchAndSort, which I interpreted as
"fetch any N rows and then sort them," is not something that exists in
Substrait. The sort algorithm can be unstable; to make it stable, simply append
the current row index/sequence number column to the list of columns to sort by.
- Have the relation FromProto function return information pertaining to how
Substrait expects the dataset returned by the returned declaration to be
ordered, by way of some "order by" clause. Whenever a fetch relation, a write
relation, or the sink is then consumed, use this information for the sort. If
the requested sort is too complex to be pushed into the node corresponding to
the to-be-converted relation (for example, I imagine we might not want to
pollute file-writing code with stuff like custom sort functions – imagine if
someone supplied a UDF here), first emit a SortAndFetch node to simplify it
down to a single uint64 column, and then sort by that. You're effectively
duplicating the sort by doing this, so maybe this should be accompanied by a
warning. That being said, with the right sort algorithm, sorting twice might
not cost too much, especially if you also know that the index column has no
duplicates (once you've received all rows in 0..i, you can immediately spill
those rows from memory to disk).
Substrait advanced optimization extensions could be used to allow users to
selectively disable some of this functionality if they don't care about
ordering:
- at read relations to disable materialization of the row index column (and
thus appear to read the file in arbitrary order);
- at write relations to ignore sort information provided along with the result
of the input FromProto, if any (and thus appear to write the file in arbitrary
order);
- likewise at plan level for the implicit sink node;
- likewise for sort relations (and thus appear to do an unstable sort instead
of a stable one);
- likewise for fetch relations (to fetch any N rows rather than some specific
window of rows).
> [C++] Adding ExecNode with Sort and Fetch capability
> ----------------------------------------------------
>
> Key: ARROW-17183
> URL: https://issues.apache.org/jira/browse/ARROW-17183
> Project: Apache Arrow
> Issue Type: New Feature
> Components: C++
> Reporter: Vibhatha Lakmal Abeykoon
> Assignee: Vibhatha Lakmal Abeykoon
> Priority: Major
>
> In Substrait integrations with ACERO, a functionality required is the ability
> to fetch records sorted and unsorted.
> Fetch operation is defined as selecting `K` number of records with an offset.
> For instance pick 10 records skipping the first 5 elements. Here we can
> define this as a Slice operation and records can be easily extracted in a
> sink-node.
> Sort and Fetch operation applies when we need to execute a Fetch operation on
> sorted data. The main issue is we cannot have a sort node followed by a
> fetch. The reason is that all existing node definitions supporting sort are
> based on sink nodes. Since there cannot be a node followed by sink, this
> functionality has to take place in a single node.
> But this is not a perfect solution for fetch and sort, but one way to do this
> is define a sink node where the records are sorted and then a set of items
> are fetched.
> Another dilema is what if sort is followed by a fetch. In that case, there
> has to be a flag to enable the order of the operations.
> The objective of this ticket is to discuss a viable efficient solution and
> include new nodes or a method to execute such a logic.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)