alamb commented on code in PR #16461: URL: https://github.com/apache/datafusion/pull/16461#discussion_r2159679800
########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +539,84 @@ fn should_enable_page_index( .unwrap_or(false) } +use datafusion_physical_expr::expressions; + +/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that +/// is cast to the specified data type. +/// Preference is always given to casting literal values to the data type of the column +/// since casting the column to the literal value's data type can be significantly more expensive. +/// Given two columns the cast is applied arbitrarily to the first column. +pub fn cast_expr_to_schema( + expr: Arc<dyn PhysicalExpr>, + physical_file_schema: &Schema, + logical_file_schema: &Schema, + partition_values: Vec<ScalarValue>, + partition_fields: &[FieldRef], +) -> Result<Arc<dyn PhysicalExpr>> { + expr.transform(|expr| { + if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() { + let logical_field = match logical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(e) => { + // If the column is a partition field, we can use the partition value + for (partition_field, partition_value) in + partition_fields.iter().zip(partition_values.iter()) + { + if partition_field.name() == column.name() { + return Ok(Transformed::yes(expressions::lit( + partition_value.clone(), + ))); + } + } + // If the column is not found in the logical schema, return an error + // This should probably never be hit unless something upstream broke, but nontheless it's better + // for us to return a handleable error than to panic / do something unexpected. + return Err(e.into()); + } + }; + let Ok(physical_field) = physical_file_schema.field_with_name(column.name()) + else { + if !logical_field.is_nullable() { + return exec_err!( + "Non-nullable column '{}' is missing from the physical schema", + column.name() + ); + } + // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. + // TODO: do we need to sync this with what the `SchemaAdapter` actually does? + // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! + let value = ScalarValue::Null.cast_to(logical_field.data_type())?; + return Ok(Transformed::yes(expressions::lit(value))); + }; + + if logical_field.data_type() == physical_field.data_type() { + return Ok(Transformed::no(expr)); + } + + // If the logical field and physical field are different, we need to cast + // the column to the logical field's data type. + // We will try later to move the cast to literal values if possible, which is computationally cheaper. Review Comment: 👍 ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +539,84 @@ fn should_enable_page_index( .unwrap_or(false) } +use datafusion_physical_expr::expressions; + +/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that +/// is cast to the specified data type. +/// Preference is always given to casting literal values to the data type of the column +/// since casting the column to the literal value's data type can be significantly more expensive. +/// Given two columns the cast is applied arbitrarily to the first column. +pub fn cast_expr_to_schema( + expr: Arc<dyn PhysicalExpr>, + physical_file_schema: &Schema, + logical_file_schema: &Schema, + partition_values: Vec<ScalarValue>, + partition_fields: &[FieldRef], +) -> Result<Arc<dyn PhysicalExpr>> { + expr.transform(|expr| { + if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() { + let logical_field = match logical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(e) => { + // If the column is a partition field, we can use the partition value + for (partition_field, partition_value) in + partition_fields.iter().zip(partition_values.iter()) + { + if partition_field.name() == column.name() { + return Ok(Transformed::yes(expressions::lit( + partition_value.clone(), + ))); + } + } + // If the column is not found in the logical schema, return an error + // This should probably never be hit unless something upstream broke, but nontheless it's better + // for us to return a handleable error than to panic / do something unexpected. Review Comment: 👍 ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -520,111 +489,15 @@ mod test { let expr = col("int64_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); let table_schema = Arc::new(table_schema.clone()); - let candidate = FilterCandidateBuilder::new( - expr, - table_schema.clone(), - table_schema, - schema_adapter_factory, - ) - .build(metadata) - .expect("building candidate"); + let candidate = FilterCandidateBuilder::new(expr, table_schema.clone()) + .build(metadata) + .expect("building candidate"); assert!(candidate.is_none()); } - #[test] - fn test_filter_type_coercion() { Review Comment: Is there any way to keep this test (or something like it) that shows reading with a predicate of a different shcema is correctly coerced? ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +539,84 @@ fn should_enable_page_index( .unwrap_or(false) } +use datafusion_physical_expr::expressions; + +/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that +/// is cast to the specified data type. +/// Preference is always given to casting literal values to the data type of the column +/// since casting the column to the literal value's data type can be significantly more expensive. +/// Given two columns the cast is applied arbitrarily to the first column. +pub fn cast_expr_to_schema( Review Comment: I think this is more general than just parquet, so perhaps we could move it into the `datafusion-physical-schema` crate or perhaps somewhere near the physical planner ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +539,84 @@ fn should_enable_page_index( .unwrap_or(false) } +use datafusion_physical_expr::expressions; + +/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that +/// is cast to the specified data type. +/// Preference is always given to casting literal values to the data type of the column +/// since casting the column to the literal value's data type can be significantly more expensive. +/// Given two columns the cast is applied arbitrarily to the first column. +pub fn cast_expr_to_schema( Review Comment: I also think it would really help (perhaps as a follow on PR) to add some more specific unit tests. I wonder if an API like the following makes sense: ```rust struct PhysicalExprSchemaRewriter { ... } // rewrite a predicate let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema) // optionally provide partition values .with_partition_columns(partition_fields, partition_values .convert(predicate)?; ``` Then I think writing unit tests would be easy and we could adapt / extend the code over time -- and it would set us up for adapting more sophisticated expressions like field extraction... ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +539,84 @@ fn should_enable_page_index( .unwrap_or(false) } +use datafusion_physical_expr::expressions; + +/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that +/// is cast to the specified data type. +/// Preference is always given to casting literal values to the data type of the column +/// since casting the column to the literal value's data type can be significantly more expensive. +/// Given two columns the cast is applied arbitrarily to the first column. +pub fn cast_expr_to_schema( + expr: Arc<dyn PhysicalExpr>, + physical_file_schema: &Schema, + logical_file_schema: &Schema, + partition_values: Vec<ScalarValue>, + partition_fields: &[FieldRef], +) -> Result<Arc<dyn PhysicalExpr>> { + expr.transform(|expr| { + if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() { + let logical_field = match logical_file_schema.field_with_name(column.name()) { Review Comment: nit: we could reduce a level of nesting with something like ```suggestion let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() else { return Ok(Transformed::no(expr)) } ``` ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -524,6 +532,62 @@ fn should_enable_page_index( .unwrap_or(false) } +use datafusion_physical_expr::expressions; + +/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that +/// is cast to the specified data type. +/// Preference is always given to casting literal values to the data type of the column +/// since casting the column to the literal value's data type can be significantly more expensive. +/// Given two columns the cast is applied arbitrarily to the first column. +pub fn cast_expr_to_schema( + expr: Arc<dyn PhysicalExpr>, + physical_file_schema: &Schema, + logical_file_schema: &Schema, +) -> Result<Arc<dyn PhysicalExpr>> { + expr.transform(|expr| { + if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() { + let logical_field = logical_file_schema.field_with_name(column.name())?; + let Ok(physical_field) = physical_file_schema.field_with_name(column.name()) + else { + if !logical_field.is_nullable() { + return exec_err!( + "Non-nullable column '{}' is missing from the physical schema", + column.name() + ); Review Comment: agreed -- that might also be a nice usecase for a builder style struct ``` let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema) .with_identifier(file_name) .convert(predicate)?; ``` 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org