alamb commented on code in PR #19538:
URL: https://github.com/apache/datafusion/pull/19538#discussion_r2666153648


##########
datafusion/physical-expr-common/src/physical_expr.rs:
##########
@@ -430,6 +430,20 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + 
Debug + DynEq + DynHash {
     fn is_volatile_node(&self) -> bool {
         false
     }
+
+    /// Returns true if this expression is trivial (cheap to evaluate).
+    ///
+    /// Trivial expressions include:
+    /// - Column references
+    /// - Literal values
+    /// - Struct field access via `get_field`
+    /// - Nested combinations of field accessors (e.g., `col['a']['b']`)
+    ///
+    /// This is used to identify expressions that are cheap to duplicate or
+    /// don't benefit from caching/partitioning optimizations.

Review Comment:
   Maybe also include that they will be pushed below filters so if they do 
per-row work, setting `is_trivial` may slow things down



##########
datafusion/expr/src/udf.rs:
##########
@@ -846,6 +851,18 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + 
Sync {
     fn documentation(&self) -> Option<&Documentation> {
         None
     }
+
+    /// Returns true if this function is trivial (cheap to evaluate).

Review Comment:
   I suggest that a good rule of thumb here is that the function takes constant 
time per RecordBatch (aka it doesn't depend on the number of rows in the 
batch). Struct field access and column have this property but other functions 
don't



##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -659,8 +659,8 @@ logical_plan
 physical_plan
 01)ProjectionExec: 
expr=[__unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1],depth=2)@0
 as UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), column3@1 as 
column3]
 02)--UnnestExec
-03)----ProjectionExec: 
expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, 
c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), 
column3@1 as column3]
-04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------ProjectionExec: 
expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, 
c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), 
column3@1 as column3]
 05)--------UnnestExec
 06)----------ProjectionExec: expr=[column3@0 as 
__unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3]
 07)------------DataSourceExec: partitions=1, partition_sizes=[1]

Review Comment:
   I think we should add some more specific testing for pushdown into parquet 
scans
   
   Perhaps in parquet_pushdown.slt
   
   Here are some suggestions:
   
   setup
   ```sql
   > copy (select column1 as a, column2 as b, column3 as s from  values (1, 2, 
{foo: 'bar'}), (10, 20, {foo: 'baz'} )) to 'foo.parquet';
   +-------+
   | count |
   +-------+
   | 2     |
   +-------+
   1 row(s) fetched.
   Elapsed 0.019 seconds.
   
   > select * from 'foo.parquet';
   +----+----+------------+
   | a  | b  | s          |
   +----+----+------------+
   | 1  | 2  | {foo: bar} |
   | 10 | 20 | {foo: baz} |
   +----+----+------------+
   2 row(s) fetched.
   Elapsed 0.014 seconds.
   ```
   
   Then demonstrate constant pushdown
   ```sql
   > explain format indent select a, 1 from 'foo.parquet';
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
   |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: foo.parquet.a, Int64(1)                        
                                                                                
   |
   |               |   TableScan: foo.parquet projection=[a]                    
                                                                                
   |
   | physical_plan | DataSourceExec: file_groups={1 group: 
[[Users/andrewlamb/Software/datafusion/foo.parquet]]}, projection=[a, 1 as 
Int64(1)], file_type=parquet |
   |               |                                                            
                                                                                
   |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched.
   Elapsed 0.008 seconds.
   ```
   
   Also show get_field pushdown
   
   ```sql
   > explain format indent select a, s['foo'] from 'foo.parquet';
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                               |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: foo.parquet.a, get_field(foo.parquet.s, 
Utf8("foo"))                                                                    
                                      |
   |               |   TableScan: foo.parquet projection=[a, s]                 
                                                                                
                               |
   | physical_plan | DataSourceExec: file_groups={1 group: 
[[Users/andrewlamb/Software/datafusion/foo.parquet]]}, projection=[a, 
get_field(s@2, foo) as foo.parquet.s[foo]], file_type=parquet |
   |               |                                                            
                                                                                
                               |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched.
   Elapsed 0.013 seconds.
   ```



##########
datafusion/core/tests/sql/explain_analyze.rs:
##########
@@ -997,10 +997,9 @@ async fn parquet_recursive_projection_pushdown() -> 
Result<()> {
     SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
       RecursiveQueryExec: name=number_series, is_distinct=false
         CoalescePartitionsExec
-          ProjectionExec: expr=[id@0 as id, 1 as level]
-            FilterExec: id@0 = 1
-              RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), 
input_partitions=1
-                DataSourceExec: file_groups={1 group: 
[[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, 
predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND 
id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]
+          FilterExec: id@0 = level@1
+            RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), 
input_partitions=1
+              DataSourceExec: file_groups={1 group: 
[[TMP_DIR/hierarchy.parquet]]}, projection=[id, 1 as level], file_type=parquet, 
predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND 
id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)]

Review Comment:
   this is somewhat interesting that it materializes the constant in the scan. 
This is probably ok, but it does mean that constant may now get carried as a 
constant record batch up through the plan many 🤔 



##########
datafusion/functions/src/core/getfield.rs:
##########
@@ -499,6 +499,10 @@ impl ScalarUDFImpl for GetFieldFunc {
     fn documentation(&self) -> Option<&Documentation> {
         self.doc()
     }
+
+    fn is_trivial(&self) -> bool {

Review Comment:
   I recommend some comments explaining the rationale  -- namely to allow these 
accesses to be pushed down into scans



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -255,18 +255,10 @@ impl ExecutionPlan for ProjectionExec {
     }
 
     fn benefits_from_input_partitioning(&self) -> Vec<bool> {
-        let all_simple_exprs =
-            self.projector
-                .projection()
-                .as_ref()
-                .iter()
-                .all(|proj_expr| {
-                    proj_expr.expr.as_any().is::<Column>()
-                        || proj_expr.expr.as_any().is::<Literal>()
-                });
-        // If expressions are all either column_expr or Literal, then all 
computations in this projection are reorder or rename,
-        // and projection would not benefit from the repartition, 
benefits_from_input_partitioning will return false.
-        vec![!all_simple_exprs]
+        // If expressions are all trivial (columns, literals, or field 
accessors),
+        // then all computations in this projection are reorder or rename,
+        // and projection would not benefit from the repartition.
+        vec![!self.projection_expr().is_trivial()]

Review Comment:
   this is a very nice simplification and a good illustration of the power of 
the `is_trivial` API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to