[
https://issues.apache.org/jira/browse/ARROW-16986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jeroen van Straten updated ARROW-16986:
---------------------------------------
Summary: [C++] Infer project nodes lazily to avoid unnecessary chains when
consuming Substrait (was: Infer project nodes lazily to avoid unnecessary
chains when consuming Substrait)
> [C++] Infer project nodes lazily to avoid unnecessary chains when consuming
> Substrait
> -------------------------------------------------------------------------------------
>
> Key: ARROW-16986
> URL: https://issues.apache.org/jira/browse/ARROW-16986
> Project: Apache Arrow
> Issue Type: Improvement
> Components: C++
> Reporter: Jeroen van Straten
> Assignee: Jeroen van Straten
> Priority: Minor
> Labels: substrait
>
> Naively converting everything projection-like in Substrait (ProjectRel, emit
> clauses, and complex expressions in non-project and non-filter relations) to
> individual project nodes would lead to conversion of optimal(-ish) Substrait
> plans to suboptimal Acero plans. Let's use this (more-or-less optimal)
> Substrait plan as an example (column indices are as Substrait would see them,
> column names between parens to make it a little easier to see what's going
> on):
> * Read column 2 (=A), 3 (=B), and 4 (=C) from a Parquet file (emit: [2 (=A),
> 3 (=B), 4 (=C)])
> * Project column 1 (B) + column 2 (=C) to get column 3 (=B+C), then drop
> column 1 (=B) (emit: [0 (=A), 2 (=C), 3 (=B+C)])
> * Order by column 0 (=A) - column 2 (=B+C), so effectively by A-B-C, then
> drop column 0 (=A) (emit: [1 (=C), 2 (=B+C)])
> Converting this naively, without exceptions to collapse the emit clause of
> ReadRel and ProjectRel, would probably yield:
> * For the ReadRel:
> ** Scanner that reads all columns
> ** Project [field(2), field(3), field(4)] for the emit clause
> * For the ProjectRel:
> ** Project [field(0), field(1), field(2), add(field(1), field(2))] for the
> body
> ** Project [field(0), field(2), field(3)] for the emit clause
> * For the SortRel:
> ** Project [field(0), field(1), field(2), sub(field(0), field(2))] to get
> the sort key
> ** Order by field(3)
> ** Project [field(0), field(1), field(2)] to revert adding the temporary
> column
> ** Project [field(1), field(2)] for the emit clause
> That's a lot of project nodes, when the user would probably expect something
> like this:
> * Scanner
> * Project [field(2) (=A), field(4) (=C), add(field(3), field(4)) (=B+C)] to
> drop unneeded columns and compute B+C
> * Project [field(0) (=A), field(1) (=C), field(2) (=B+C), sub(field(0),
> field(2)) (=A-B-C)] to compute the sort key (not collapsed with previous
> because that would repeat the B+C subexpression)
> * Sort on field(3)
> * Project [field(1) (=C), field(2) (=B+C)] to drop the temporary field and
> column A
> I suggest the following for future me (or someone else, but feel free to do
> what you think is best in that case). For relation ToProtos, rather than
> returning a Declaration, return a new class that tracks:
> * declaration: the compute::Declaration up to this point;
> * pending_projection: a vector of Substrait expressions that represent the
> schema Substrait expects based on the schema returned by the
> compute::Declaration, using empty expressions to signal that no change is
> needed;
> * temporaries: a vector of Substrait expressions that will be needed as
> temporaries to express the next relation, such as complex expressions
> encountered in a join condition.
> The class would need functions to:
> * construct from a scanner declaration and number of columns.
> ** post-condition: declaration is set to the given declaration.
> ** post-condition: pending_declarations.size() equals the number of columns.
> ** post-condition: all expressions in pending_declarations are simply field
> references to their own column index.
> ** post-condition: temporaries is empty.
> * update the state based on an emit clause.
> ** pre-condition: temporaries is empty (otherwise column indices will
> desync).
> ** if the emit clause is no-op, do nothing.
> ** swizzle/remove elements in pending_declaration based on the emit clause.
> * update the state based on the body of a ProjectRel.
> ** pre-condition: temporaries is empty (otherwise column indices will
> desync).
> ** some heuristic to determine whether to commit pending expressions into a
> Project node before appending the incoming expressions or not [?]; being too
> lazy may result in duplicated subexpressions, while being too eager may yield
> unnecessary project relations. It probably pays to be eager here, unless all
> pending expressions are any combination of only literals or field references.
> ** append the incoming expressions to the pending expression vector.
> * force a commit of the pending expressions to a Project node (if any are
> pending).
> ** post-condition: all expressions in pending_declarations are simply field
> references to their own column index.
> ** post-condition: temporaries is empty.
> * append a temporary expression, yielding a FieldRef for the next relation
> to make use of; to be used whenever Substrait allows an arbitrary expression
> in a place where Acero only supports FieldRefs.
> ** expression is appended to the back of temporaries.
> ** FieldRef returned simply references the index of the added temporary +
> pending_projection.size().
> * update the declaration by means of a (Declaration) -> Result<Declaration>
> closure, which may make use of the FieldRefs returned by previous
> add_temporary() calls.
> ** commit pending expressions to a Project node (if necessary).
> ** clear the list of temporaries.
> ** update declaration using the closure.
> ** post-condition: all expressions in pending_declarations are simply field
> references to their own column index.
> ** post-condition: temporaries is empty.
> * yield the final declaration.
> ** pre-condition: temporaries is empty (caller is doing something weird if
> it's not).
> ** commit pending expressions to a Project node (if necessary).
> ** return move(declaration).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)