[
https://issues.apache.org/jira/browse/ARROW-17183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571011#comment-17571011
]
Weston Pace commented on ARROW-17183:
-------------------------------------
The ordering conversation has come up in a few different contexts as well. I
think it is pretty clear that we will want to support this in Acero. However,
this is not too bad, even with parallel scanning and execution. My general
proposal at the moment is to label batches with a batch index that is
monotonically increasing from 0 (this has some consequences like the filter
node may have to emit empty batches). Then, at the end of the plan, we
optionally resequence (not sort) the output as part of the sink.
The difference between a resequence and a sort is that the former asserts the
output is "almost ordered". For example, batch X can not be more than N
batches behind batch X+1 where N is the "jitter" of the input. We should be
able to generally guarantee this and N ends up being proportional to the
readahead configuration of the scanner and the number of threads. Though I
haven't yet 100% sold myself on this (and probably won't until implementation).
The advantage of resequencing is that it is a "streaming operation", i.e. we
only need a fixed size queue of length N and we don't need to worry about
spilling to disk.
Various nodes then manipulate the batch index and this corresponds exactly the
the "orderedness" property of Substrait. For example, filter & project respect
the batch index. Sort assigns a new batch index. Join and aggregate clear the
batch index (set it to -1). The resequencing sink, if it encounters a -1 batch
index, will just immediately output the batches with no ordering guarantee.
> [C++] Adding ExecNode with Sort and Fetch capability
> ----------------------------------------------------
>
> Key: ARROW-17183
> URL: https://issues.apache.org/jira/browse/ARROW-17183
> Project: Apache Arrow
> Issue Type: New Feature
> Components: C++
> Reporter: Vibhatha Lakmal Abeykoon
> Assignee: Vibhatha Lakmal Abeykoon
> Priority: Major
>
> In Substrait integrations with ACERO, a functionality required is the ability
> to fetch records sorted and unsorted.
> Fetch operation is defined as selecting `K` number of records with an offset.
> For instance pick 10 records skipping the first 5 elements. Here we can
> define this as a Slice operation and records can be easily extracted in a
> sink-node.
> Sort and Fetch operation applies when we need to execute a Fetch operation on
> sorted data. The main issue is we cannot have a sort node followed by a
> fetch. The reason is that all existing node definitions supporting sort are
> based on sink nodes. Since there cannot be a node followed by sink, this
> functionality has to take place in a single node.
> But this is not a perfect solution for fetch and sort, but one way to do this
> is define a sink node where the records are sorted and then a set of items
> are fetched.
> Another dilema is what if sort is followed by a fetch. In that case, there
> has to be a flag to enable the order of the operations.
> The objective of this ticket is to discuss a viable efficient solution and
> include new nodes or a method to execute such a logic.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)