adriangb commented on code in PR #17090:
URL: https://github.com/apache/datafusion/pull/17090#discussion_r2279678882


##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -217,8 +234,25 @@ impl DynamicFilterPhysicalExpr {
         current.expr = new_expr;
         // Increment the generation to indicate that the expression has 
changed.
         current.generation += 1;
+        // Relaxed ordering is sufficient as `key_count` is only used for
+        // observability and does not synchronize with other data.

Review Comment:
   observability is super vague - what is it actually used for?



##########
datafusion/common/src/join_type.rs:
##########
@@ -111,6 +111,63 @@ impl JoinType {
                 | JoinType::RightAnti
         )
     }
+    /// Returns true if the left side of this join preserves its input rows
+    /// for filters applied *after* the join.
+    #[inline]
+    pub const fn preserves_left_for_output_filters(self) -> bool {

Review Comment:
   I arrived at this same refactor in 
https://github.com/apache/datafusion/pull/17153 - I think it's a good one. Can 
we pull this out into its own PR?



##########
datafusion/common/src/join_type.rs:
##########
@@ -111,6 +111,63 @@ impl JoinType {
                 | JoinType::RightAnti
         )
     }
+    /// Returns true if the left side of this join preserves its input rows
+    /// for filters applied *after* the join.
+    #[inline]

Review Comment:
   I see a lot of use of `#[inline]`. My understanding is that without specific 
evidence that it helps performance it may actually sometimes hurt it and it's 
best to not throw it around unless it's very obvious or can be proven to help 
performance.



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -404,9 +404,11 @@ impl TopK {
             .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b)));
 
         if let Some(predicate) = dynamic_predicate {
-            if !predicate.eq(&lit(true)) {
-                filter.update(predicate)?;
-            }
+            filter.update(predicate, self.heap.len())?;
+        } else {
+            // Even when the dynamic predicate is a tautology we still update

Review Comment:
   tautology is a very fancy word - would it be possible to give an explanation 
of what the term means?



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -48,6 +51,9 @@ pub struct DynamicFilterPhysicalExpr {
     remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
     /// The source of dynamic filters.
     inner: Arc<RwLock<Inner>>,
+    /// Number of keys currently contained in this dynamic filter.
+    /// Uses relaxed atomics as this counter is for diagnostics only.
+    key_count: Arc<AtomicUsize>,

Review Comment:
   If I'm just reading this module I'd have no idea what `key_count` is. Is it 
a join specific thing? What is a "key" in this context? If we actually do need 
this I propose we add it in an isolated PR.



##########
datafusion/physical-optimizer/src/filter_pushdown.rs:
##########
@@ -708,6 +711,7 @@ impl<T: Clone> FilteredVec<T> {
     }
 }
 
+#[inline]

Review Comment:
   Same comment about putting inlines without any obvious justification 



##########
dev/changelog/50.0.0.md:
##########
@@ -0,0 +1,39 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache DataFusion 50.0.0 Changelog
+
+This release is under active development.
+
+- **Breaking:** `DynamicFilterPhysicalExpr::update` now requires an extra
+  `key_count` argument.
+- Enable dynamic filter pushdown for left, right, semi, anti, and mark joins
+  [#16445](https://github.com/apache/datafusion/pull/16445) (adriangb). Mark 
joins
+  push filters to the side opposite the preserved input 
(`dynamic_filter_side`; see
+  
[tests](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/hash_join.rs#L2033-L2049)).
 Formats without predicate pushdown (CSV/JSON) will not benefit.

Review Comment:
   Actually if they have statistics they will still benefit (those two 
generally don't)



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2664,23 +2664,23 @@ async fn test_count_wildcard_on_where_in() -> 
Result<()> {
     assert_snapshot!(
         pretty_format_batches(&sql_results).unwrap(),
         @r"
-    
+---------------+------------------------------------------------------------------------------------------------------------------------+
-    | plan_type     | plan                                                     
                                                              |
-    
+---------------+------------------------------------------------------------------------------------------------------------------------+
-    | logical_plan  | LeftSemi Join: CAST(t1.a AS Int64) = 
__correlated_sq_1.count(*)                                                      
  |
-    |               |   TableScan: t1 projection=[a, b]                        
                                                              |
-    |               |   SubqueryAlias: __correlated_sq_1                       
                                                              |
-    |               |     Projection: count(Int64(1)) AS count(*)              
                                                              |
-    |               |       Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]  
                                                              |
-    |               |         TableScan: t2 projection=[]                      
                                                              |
-    | physical_plan | CoalesceBatchesExec: target_batch_size=8192              
                                                              |
-    |               |   HashJoinExec: mode=CollectLeft, join_type=RightSemi, 
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
-    |               |     ProjectionExec: expr=[4 as count(*)]                 
                                                              |
-    |               |       PlaceholderRowExec                                 
                                                              |
-    |               |     ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 
AS Int64) as CAST(t1.a AS Int64)]                               |
-    |               |       DataSourceExec: partitions=1, partition_sizes=[1]  
                                                              |
-    |               |                                                          
                                                              |
-    
+---------------+------------------------------------------------------------------------------------------------------------------------+
+    
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
+    | plan_type     | plan                                                     
                                                                                
             |
+    
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
+    | logical_plan  | LeftSemi Join: CAST(t1.a AS Int64) = 
__correlated_sq_1.count(*)                                                      
                                 |
+    |               |   TableScan: t1 projection=[a, b]                        
                                                                                
             |
+    |               |   SubqueryAlias: __correlated_sq_1                       
                                                                                
             |
+    |               |     Projection: count(Int64(1)) AS count(*)              
                                                                                
             |
+    |               |       Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]  
                                                                                
             |
+    |               |         TableScan: t2 projection=[]                      
                                                                                
             |
+    | physical_plan | CoalesceBatchesExec: target_batch_size=8192              
                                                                                
             |
+    |               |   HashJoinExec: mode=CollectLeft, join_type=RightSemi, 
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1], 
probe_side=Left, probe_keys=0 |

Review Comment:
   Can we pull out the addition of these fields to the debug output into its 
own PR?



##########
dev/changelog/50.0.0.md:
##########
@@ -0,0 +1,39 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache DataFusion 50.0.0 Changelog
+
+This release is under active development.
+
+- **Breaking:** `DynamicFilterPhysicalExpr::update` now requires an extra
+  `key_count` argument.
+- Enable dynamic filter pushdown for left, right, semi, anti, and mark joins
+  [#16445](https://github.com/apache/datafusion/pull/16445) (adriangb). Mark 
joins
+  push filters to the side opposite the preserved input 
(`dynamic_filter_side`; see
+  
[tests](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/hash_join.rs#L2033-L2049)).
 Formats without predicate pushdown (CSV/JSON) will not benefit.
+  Full joins and non‑equi (range or composite) predicates are not yet 
supported;
+  see [#7955](https://github.com/apache/datafusion/issues/7955). Dynamic 
filters
+  add planning overhead for high-cardinality keys; disable via:

Review Comment:
   Could you help me understand where the overhead comes from at the moment? Is 
it measurable at all?



##########
docs/source/library-user-guide/join-preservation.md:
##########
@@ -0,0 +1,54 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Join preservation
+
+Dynamic filter pushdown and other optimizations rely on whether a join 
preserves
+rows from its inputs. The tables below summarise which sides are preserved for
+post-join output filtering and for evaluation of `ON`-clause predicates.
+
+## Output filtering

Review Comment:
   Thanks so much. This is an instant commit / approve (with verification that 
it's correct) if made as it's own PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to