This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 66330bd43d fix: `reassign_predicate_columns` w/ in-list expr (#6114)
66330bd43d is described below
commit 66330bd43d13710889a2e5c8feb7ceecc145b87f
Author: Marco Neumann <[email protected]>
AuthorDate: Tue Apr 25 20:00:17 2023 +0200
fix: `reassign_predicate_columns` w/ in-list expr (#6114)
We need to also re-assign the schema stored within `InListExpr` at the
same time when we replace the children.
---
datafusion/physical-expr/src/utils.rs | 67 ++++++++++++++++++++++++++++++++---
1 file changed, 63 insertions(+), 4 deletions(-)
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index 472914f8d0..89b51dada8 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::equivalence::EquivalentClass;
-use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
+use crate::expressions::{BinaryExpr, Column, InListExpr, UnKnownColumn};
use crate::{
EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
};
@@ -573,8 +573,10 @@ pub fn reassign_predicate_columns(
schema: &SchemaRef,
ignore_not_found: bool,
) -> Result<Arc<dyn PhysicalExpr>> {
- pred.transform(&|expr| {
- if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+ pred.transform_down(&|expr| {
+ let expr_any = expr.as_any();
+
+ if let Some(column) = expr_any.downcast_ref::<Column>() {
let index = match schema.index_of(column.name()) {
Ok(idx) => idx,
Err(_) if ignore_not_found => usize::MAX,
@@ -584,6 +586,26 @@ pub fn reassign_predicate_columns(
column.name(),
index,
))));
+ } else if let Some(in_list) = expr_any.downcast_ref::<InListExpr>() {
+ // transform child first
+ let expr = reassign_predicate_columns(
+ in_list.expr().clone(),
+ schema,
+ ignore_not_found,
+ )?;
+ let list = in_list
+ .list()
+ .iter()
+ .map(|expr| {
+ reassign_predicate_columns(expr.clone(), schema,
ignore_not_found)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ return Ok(Transformed::Yes(Arc::new(InListExpr::new(
+ expr,
+ list,
+ in_list.negated(),
+ schema.as_ref(),
+ ))));
}
Ok(Transformed::No(expr))
@@ -593,7 +615,7 @@ pub fn reassign_predicate_columns(
#[cfg(test)]
mod tests {
use super::*;
- use crate::expressions::{binary, cast, col, lit, Column, Literal};
+ use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
use crate::PhysicalSortExpr;
use arrow::compute::SortOptions;
use datafusion_common::{Result, ScalarValue};
@@ -918,4 +940,41 @@ mod tests {
}));
Ok(())
}
+
+ #[test]
+ fn test_reassign_predicate_columns_in_list() {
+ let int_field = Field::new("should_not_matter", DataType::Int64, true);
+ let dict_field = Field::new(
+ "id",
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8)),
+ true,
+ );
+ let schema_small = Arc::new(Schema::new(vec![dict_field.clone()]));
+ let schema_big = Arc::new(Schema::new(vec![int_field, dict_field]));
+ let pred = in_list(
+ Arc::new(Column::new_with_schema("id", &schema_big).unwrap()),
+ vec![lit(ScalarValue::Dictionary(
+ Box::new(DataType::Int32),
+ Box::new(ScalarValue::from("2")),
+ ))],
+ &false,
+ &schema_big,
+ )
+ .unwrap();
+
+ let actual = reassign_predicate_columns(pred, &schema_small,
false).unwrap();
+
+ let expected = in_list(
+ Arc::new(Column::new_with_schema("id", &schema_small).unwrap()),
+ vec![lit(ScalarValue::Dictionary(
+ Box::new(DataType::Int32),
+ Box::new(ScalarValue::from("2")),
+ ))],
+ &false,
+ &schema_small,
+ )
+ .unwrap();
+
+ assert_eq!(actual.as_ref(), expected.as_any());
+ }
}