alamb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2159679800


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +539,84 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    physical_file_schema: &Schema,
+    logical_file_schema: &Schema,
+    partition_values: Vec<ScalarValue>,
+    partition_fields: &[FieldRef],
+) -> Result<Arc<dyn PhysicalExpr>> {
+    expr.transform(|expr| {
+        if let Some(column) = 
expr.as_any().downcast_ref::<expressions::Column>() {
+            let logical_field = match 
logical_file_schema.field_with_name(column.name()) {
+                Ok(field) => field,
+                Err(e) => {
+                    // If the column is a partition field, we can use the 
partition value
+                    for (partition_field, partition_value) in
+                        partition_fields.iter().zip(partition_values.iter())
+                    {
+                        if partition_field.name() == column.name() {
+                            return Ok(Transformed::yes(expressions::lit(
+                                partition_value.clone(),
+                            )));
+                        }
+                    }
+                    // If the column is not found in the logical schema, 
return an error
+                    // This should probably never be hit unless something 
upstream broke, but nontheless it's better
+                    // for us to return a handleable error than to panic / do 
something unexpected.
+                    return Err(e.into());
+                }
+            };
+            let Ok(physical_field) = 
physical_file_schema.field_with_name(column.name())
+            else {
+                if !logical_field.is_nullable() {
+                    return exec_err!(
+                        "Non-nullable column '{}' is missing from the physical 
schema",
+                        column.name()
+                    );
+                }
+                // If the column is missing from the physical schema fill it 
in with nulls as `SchemaAdapter` would do.
+                // TODO: do we need to sync this with what the `SchemaAdapter` 
actually does?
+                // While the default implementation fills in nulls in theory a 
custom `SchemaAdapter` could do something else!
+                let value = 
ScalarValue::Null.cast_to(logical_field.data_type())?;
+                return Ok(Transformed::yes(expressions::lit(value)));
+            };
+
+            if logical_field.data_type() == physical_field.data_type() {
+                return Ok(Transformed::no(expr));
+            }
+
+            // If the logical field and physical field are different, we need 
to cast
+            // the column to the logical field's data type.
+            // We will try later to move the cast to literal values if 
possible, which is computationally cheaper.

Review Comment:
   👍 



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +539,84 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    physical_file_schema: &Schema,
+    logical_file_schema: &Schema,
+    partition_values: Vec<ScalarValue>,
+    partition_fields: &[FieldRef],
+) -> Result<Arc<dyn PhysicalExpr>> {
+    expr.transform(|expr| {
+        if let Some(column) = 
expr.as_any().downcast_ref::<expressions::Column>() {
+            let logical_field = match 
logical_file_schema.field_with_name(column.name()) {
+                Ok(field) => field,
+                Err(e) => {
+                    // If the column is a partition field, we can use the 
partition value
+                    for (partition_field, partition_value) in
+                        partition_fields.iter().zip(partition_values.iter())
+                    {
+                        if partition_field.name() == column.name() {
+                            return Ok(Transformed::yes(expressions::lit(
+                                partition_value.clone(),
+                            )));
+                        }
+                    }
+                    // If the column is not found in the logical schema, 
return an error
+                    // This should probably never be hit unless something 
upstream broke, but nontheless it's better
+                    // for us to return a handleable error than to panic / do 
something unexpected.

Review Comment:
   👍 



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -520,111 +489,15 @@ mod test {
         let expr = col("int64_list").is_not_null();
         let expr = logical2physical(&expr, &table_schema);
 
-        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
         let table_schema = Arc::new(table_schema.clone());
 
-        let candidate = FilterCandidateBuilder::new(
-            expr,
-            table_schema.clone(),
-            table_schema,
-            schema_adapter_factory,
-        )
-        .build(metadata)
-        .expect("building candidate");
+        let candidate = FilterCandidateBuilder::new(expr, table_schema.clone())
+            .build(metadata)
+            .expect("building candidate");
 
         assert!(candidate.is_none());
     }
 
-    #[test]
-    fn test_filter_type_coercion() {

Review Comment:
   Is there any way to keep this test (or something like it) that shows reading 
with a predicate of a different shcema is correctly coerced?



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +539,84 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(

Review Comment:
   I think this is more general than just parquet, so perhaps we could move it 
into the `datafusion-physical-schema` crate or perhaps somewhere near the 
physical planner



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +539,84 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(

Review Comment:
   
   
   I also think it would really help (perhaps as a follow on PR) to add some 
more specific unit tests.
   
   
   I wonder if an API like the following makes sense:
   ```rust
   struct PhysicalExprSchemaRewriter { 
   ...
   }
   
   
   // rewrite a predicate
   let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, 
logical_file_schema)
     // optionally provide partition values
     .with_partition_columns(partition_fields, partition_values
     .convert(predicate)?;
   
   ```
   
   Then I think writing unit tests would be easy and we could adapt / extend 
the code over time -- and it would set us up for adapting more sophisticated 
expressions like field extraction...
   
    



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +539,84 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    physical_file_schema: &Schema,
+    logical_file_schema: &Schema,
+    partition_values: Vec<ScalarValue>,
+    partition_fields: &[FieldRef],
+) -> Result<Arc<dyn PhysicalExpr>> {
+    expr.transform(|expr| {
+        if let Some(column) = 
expr.as_any().downcast_ref::<expressions::Column>() {
+            let logical_field = match 
logical_file_schema.field_with_name(column.name()) {

Review Comment:
   nit: we could reduce a level of nesting with something like
   ```suggestion
           let Some(column) = 
expr.as_any().downcast_ref::<expressions::Column>() else {
               return Ok(Transformed::no(expr))
            }
   ```



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +532,62 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new 
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of 
the column
+/// since casting the column to the literal value's data type can be 
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    physical_file_schema: &Schema,
+    logical_file_schema: &Schema,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    expr.transform(|expr| {
+        if let Some(column) = 
expr.as_any().downcast_ref::<expressions::Column>() {
+            let logical_field = 
logical_file_schema.field_with_name(column.name())?;
+            let Ok(physical_field) = 
physical_file_schema.field_with_name(column.name())
+            else {
+                if !logical_field.is_nullable() {
+                    return exec_err!(
+                        "Non-nullable column '{}' is missing from the physical 
schema",
+                        column.name()
+                    );

Review Comment:
   agreed -- that might also be a nice usecase for a builder style struct 
   ```
   let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, 
logical_file_schema)
     .with_identifier(file_name)
     .convert(predicate)?;
   ```
   🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to