[ 
https://issues.apache.org/jira/browse/ARROW-16986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeroen van Straten updated ARROW-16986:
---------------------------------------
    Summary: [C++] Infer project nodes lazily to avoid unnecessary chains when 
consuming Substrait  (was: Infer project nodes lazily to avoid unnecessary 
chains when consuming Substrait)

> [C++] Infer project nodes lazily to avoid unnecessary chains when consuming 
> Substrait
> -------------------------------------------------------------------------------------
>
>                 Key: ARROW-16986
>                 URL: https://issues.apache.org/jira/browse/ARROW-16986
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++
>            Reporter: Jeroen van Straten
>            Assignee: Jeroen van Straten
>            Priority: Minor
>              Labels: substrait
>
> Naively converting everything projection-like in Substrait (ProjectRel, emit 
> clauses, and complex expressions in non-project and non-filter relations) to 
> individual project nodes would lead to conversion of optimal(-ish) Substrait 
> plans to suboptimal Acero plans. Let's use this (more-or-less optimal) 
> Substrait plan as an example (column indices are as Substrait would see them, 
> column names between parens to make it a little easier to see what's going 
> on):
>  * Read column 2 (=A), 3 (=B), and 4 (=C) from a Parquet file (emit: [2 (=A), 
> 3 (=B), 4 (=C)])
>  * Project column 1 (B) + column 2 (=C) to get column 3 (=B+C), then drop 
> column 1 (=B) (emit: [0 (=A), 2 (=C), 3 (=B+C)])
>  * Order by column 0 (=A) - column 2 (=B+C), so effectively by A-B-C, then 
> drop column 0 (=A) (emit: [1 (=C), 2 (=B+C)])
> Converting this naively, without exceptions to collapse the emit clause of 
> ReadRel and ProjectRel, would probably yield:
>  * For the ReadRel:
>  ** Scanner that reads all columns
>  ** Project [field(2), field(3), field(4)] for the emit clause
>  * For the ProjectRel:
>  ** Project [field(0), field(1), field(2), add(field(1), field(2))] for the 
> body
>  ** Project [field(0), field(2), field(3)] for the emit clause
>  * For the SortRel:
>  ** Project [field(0), field(1), field(2), sub(field(0), field(2))] to get 
> the sort key
>  ** Order by field(3)
>  ** Project [field(0), field(1), field(2)] to revert adding the temporary 
> column
>  ** Project [field(1), field(2)] for the emit clause
> That's a lot of project nodes, when the user would probably expect something 
> like this:
>  * Scanner
>  * Project [field(2) (=A), field(4) (=C), add(field(3), field(4)) (=B+C)] to 
> drop unneeded columns and compute B+C
>  * Project [field(0) (=A), field(1) (=C), field(2) (=B+C), sub(field(0), 
> field(2)) (=A-B-C)] to compute the sort key (not collapsed with previous 
> because that would repeat the B+C subexpression)
>  * Sort on field(3)
>  * Project [field(1) (=C), field(2) (=B+C)] to drop the temporary field and 
> column A
> I suggest the following for future me (or someone else, but feel free to do 
> what you think is best in that case). For relation ToProtos, rather than 
> returning a Declaration, return a new class that tracks:
>  * declaration: the compute::Declaration up to this point;
>  * pending_projection: a vector of Substrait expressions that represent the 
> schema Substrait expects based on the schema returned by the 
> compute::Declaration, using empty expressions to signal that no change is 
> needed;
>  * temporaries: a vector of Substrait expressions that will be needed as 
> temporaries to express the next relation, such as complex expressions 
> encountered in a join condition.
> The class would need functions to:
>  * construct from a scanner declaration and number of columns.
>  ** post-condition: declaration is set to the given declaration.
>  ** post-condition: pending_declarations.size() equals the number of columns.
>  ** post-condition: all expressions in pending_declarations are simply field 
> references to their own column index.
>  ** post-condition: temporaries is empty.
>  * update the state based on an emit clause.
>  ** pre-condition: temporaries is empty (otherwise column indices will 
> desync).
>  ** if the emit clause is no-op, do nothing.
>  ** swizzle/remove elements in pending_declaration based on the emit clause.
>  * update the state based on the body of a ProjectRel.
>  ** pre-condition: temporaries is empty (otherwise column indices will 
> desync).
>  ** some heuristic to determine whether to commit pending expressions into a 
> Project node before appending the incoming expressions or not [?]; being too 
> lazy may result in duplicated subexpressions, while being too eager may yield 
> unnecessary project relations. It probably pays to be eager here, unless all 
> pending expressions are any combination of only literals or field references.
>  ** append the incoming expressions to the pending expression vector.
>  * force a commit of the pending expressions to a Project node (if any are 
> pending).
>  ** post-condition: all expressions in pending_declarations are simply field 
> references to their own column index.
>  ** post-condition: temporaries is empty.
>  * append a temporary expression, yielding a FieldRef for the next relation 
> to make use of; to be used whenever Substrait allows an arbitrary expression 
> in a place where Acero only supports FieldRefs.
>  ** expression is appended to the back of temporaries.
>  ** FieldRef returned simply references the index of the added temporary + 
> pending_projection.size().
>  * update the declaration by means of a (Declaration) -> Result<Declaration> 
> closure, which may make use of the FieldRefs returned by previous 
> add_temporary() calls.
>  ** commit pending expressions to a Project node (if necessary).
>  ** clear the list of temporaries.
>  ** update declaration using the closure.
>  ** post-condition: all expressions in pending_declarations are simply field 
> references to their own column index.
>  ** post-condition: temporaries is empty.
>  * yield the final declaration.
>  ** pre-condition: temporaries is empty (caller is doing something weird if 
> it's not).
>  ** commit pending expressions to a Project node (if necessary).
>  ** return move(declaration).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to