westonpace commented on PR #35453:
URL: https://github.com/apache/arrow/pull/35453#issuecomment-1540452046

   > Honestly, it would be nice just to be able to get a dataset to be 
considered "ordered" by its partition values. For example, if I had two tables 
partitioned by date, it would be nice to be able to join them via a sort merge 
join (or maybe asof join?) on that date column while using the inherent sorting 
of the partition values. (Having flashbacks to some frustrations in my days 
working with PySpark.)
   
   We are quite close to being able to do this.  In particular, I think we are 
very close to being able to take a query like "SELECT MEAN(x) FROM dataset 
GROUP BY part" and, instead of having aggregation be a pipeline breaker, have 
it scan values in partition order and emit values as soon as a partition is 
exhausted.  Here is what we have, and what is missing:
   
    * DONE: ExecBatch has an `index` property to determine its sorting.
    * DONE: ExecNode has an `ordering` property which says which columns its 
output are ordered by
    * DONE: The scan node has the ability to scan a dataset in order, this 
means the AsyncGenerator<ExecBatch> will emit exec batches in a deterministic 
order (filename, batch_in_file)
    * NEEDED: The scan node needs to detect if the dataset is partitioned, ask 
the dataset for fragments ordered by partition, and then assign its `ordering` 
property based on the partition columns (this sounds harder than it is)
    * DONE: The aggregate node is able to define "segment keys" which, if 
given, it will use to break up the output
    * NEEDED: The aggregate node's segment keys currently only work if run in 
single threaded mode.  We need to support parallelism.  This should be easily 
obtainable by using a sequencing queue
    * NOTE: The above assumes that the user intelligently crafts the plan (e.g. 
they know to assign "part" as a segment key when creating the aggregate node).  
In reality, this is something that would happen in a planner (it would notice 
that "part" is a partition key, figure out that it can configure the scan to 
emit in sequence, and then use part when creating the aggregate node).  This 
means this feature will need to be exploited by reasonable intelligent users.  
However, this is in line with Acero being a dumb execution plan without a 
planner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to