alamb commented on code in PR #19128:
URL: https://github.com/apache/datafusion/pull/19128#discussion_r2602788159


##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -34,6 +35,35 @@ use datafusion_physical_expr::{
 };
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
+/// Replace column references in the given physical expression with literal 
values.
+///
+/// This is used to substitute partition column references with their literal 
values during expression rewriting.
+/// In the future this may be used to replace columns that can be proven to be 
constant from statistical analysis
+/// with their literal values as well.
+///
+/// # Arguments
+/// - `expr`: The physical expression in which to replace column references.
+/// - `replacements`: A mapping from column names to their corresponding 
literal `ScalarValue`s.
+///
+/// # Returns
+/// - `Result<Arc<dyn PhysicalExpr>>`: The rewritten physical expression with 
columns replaced by literals.
+pub fn replace_columns_with_literals(
+    expr: Arc<dyn PhysicalExpr>,
+    replacements: &HashMap<&str, &ScalarValue>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    expr.transform(|expr| {
+        if let Some(column) = expr.as_any().downcast_ref::<Column>()

Review Comment:
   the `let` chains are quite cool



##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -34,6 +35,35 @@ use datafusion_physical_expr::{
 };
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
+/// Replace column references in the given physical expression with literal 
values.
+///
+/// This is used to substitute partition column references with their literal 
values during expression rewriting.

Review Comment:
   Another potential usecase is filling values for columns with default values 
(rather than `NULL`)



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -178,20 +189,23 @@ impl FileOpener for ParquetOpener {
             // we can end the stream early.
             let mut file_pruner = predicate
                 .as_ref()
-                .map(|p| {
-                    Ok::<_, DataFusionError>(
-                        (is_dynamic_physical_expr(p) | 
partitioned_file.has_statistics())
-                            .then_some(FilePruner::new(
-                                Arc::clone(p),
-                                &logical_file_schema,
-                                partition_fields.clone(),
-                                partitioned_file.clone(),
-                                predicate_creation_errors.clone(),
-                            )?),
-                    )
+                .filter(|p| {
+                    // Make a FilePruner only if there is either a dynamic 
expr in the predicate or the file has file-level statistics.
+                    // File-level statistics may allow us to prune the file 
without loading any row groups or metadata.
+                    // If there is a dynamic filter we may be able to prune 
the file later as the dynamic filter is updated.
+                    // This does allow the case where there is a dynamic 
filter but no statistics, in which case

Review Comment:
   It took me a bit to parse this comment
   
   I left a suggestion of how we might be able to improve it



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -946,17 +946,15 @@ mod test {
                 batch_size: 1024,
                 limit: None,
                 predicate: Some(predicate),
-                logical_file_schema: file_schema.clone(),
+                table_schema: TableSchema::new(
+                    file_schema.clone(),

Review Comment:
   ```suggestion
                       Arc::clone(file_schema),
   ```



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -946,17 +946,15 @@ mod test {
                 batch_size: 1024,
                 limit: None,
                 predicate: Some(predicate),
-                logical_file_schema: file_schema.clone(),
+                table_schema: TableSchema::new(
+                    file_schema.clone(),

Review Comment:
   (and several other examples below)



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -139,10 +137,23 @@ impl FileOpener for ParquetOpener {
 
         let batch_size = self.batch_size;
 
+        let mut predicate = {
+            let partition_values: HashMap<&str, &ScalarValue> = self
+                .table_schema
+                .table_partition_cols()
+                .iter()
+                .zip(partitioned_file.partition_values.iter())
+                .map(|(field, value)| (field.name().as_str(), value))
+                .collect();
+
+            self.predicate

Review Comment:
   is it worth skipping this clone/rewrite if there are no partition_values 
(aka if `partition_values.is_empty()`?)



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -877,13 +878,12 @@ mod test {
                 batch_size: 1024,
                 limit: None,
                 predicate: Some(predicate),
-                logical_file_schema: schema.clone(),
+                table_schema: TableSchema::from_file_schema(schema.clone()),

Review Comment:
   ```suggestion
                   table_schema: 
TableSchema::from_file_schema(Arc::clone(schema.clone)),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to