This is an automated email from the ASF dual-hosted git repository.

liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 06f01eaf6 change the null type in the row filter (#3470)
06f01eaf6 is described below

commit 06f01eaf65b78d73ffc7fb0bf95619e1e884e850
Author: Kun Liu <[email protected]>
AuthorDate: Thu Sep 15 08:56:22 2022 +0800

    change the null type in the row filter (#3470)
---
 .../src/physical_plan/file_format/row_filter.rs    | 26 +++++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs 
b/datafusion/core/src/physical_plan/file_format/row_filter.rs
index 56bdba557..473856b7b 100644
--- a/datafusion/core/src/physical_plan/file_format/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -19,7 +19,7 @@ use arrow::array::{Array, BooleanArray};
 use arrow::datatypes::{DataType, Field, Schema};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use datafusion_common::{Column, Result, ScalarValue, ToDFSchema};
+use datafusion_common::{Column, DataFusionError, Result, ScalarValue, 
ToDFSchema};
 use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, 
RewriteRecursion};
 
 use datafusion_expr::{uncombine_filter, Expr};
@@ -202,7 +202,18 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
     fn mutate(&mut self, expr: Expr) -> Result<Expr> {
         if let Expr::Column(Column { name, .. }) = &expr {
             if self.file_schema.field_with_name(name).is_err() {
-                return Ok(Expr::Literal(ScalarValue::Null));
+                // the column expr must be in the table schema
+                return match self.table_schema.field_with_name(name) {
+                    Ok(field) => {
+                        // return the null value corresponding to the data type
+                        let null_value = 
ScalarValue::try_from(field.data_type())?;
+                        Ok(Expr::Literal(null_value))
+                    }
+                    Err(e) => {
+                        // If the column is not in the table schema, should 
throw the error
+                        Err(DataFusionError::ArrowError(e))
+                    }
+                };
             }
         }
 
@@ -314,12 +325,13 @@ mod test {
     use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder;
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::ScalarValue;
-    use datafusion_expr::{col, lit};
+    use datafusion_expr::{cast, col, lit};
     use parquet::arrow::parquet_to_arrow_schema;
     use parquet::file::reader::{FileReader, SerializedFileReader};
 
     // Assume a column expression for a column not in the table schema is a 
projected column and ignore it
     #[test]
+    #[should_panic(expected = "building candidate failed")]
     fn test_filter_candidate_builder_ignore_projected_columns() {
         let testdata = crate::test_util::parquet_test_data();
         let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet", 
testdata))
@@ -337,7 +349,7 @@ mod test {
 
         let candidate = FilterCandidateBuilder::new(expr, &table_schema, 
&table_schema)
             .build(metadata)
-            .expect("building candidate");
+            .expect("building candidate failed");
 
         assert!(candidate.is_none());
     }
@@ -386,8 +398,10 @@ mod test {
             Field::new("float_col", DataType::Float32, true),
         ]);
 
-        let expr = col("bigint_col").eq(col("int_col"));
-        let expected_candidate_expr = 
col("bigint_col").eq(lit(ScalarValue::Null));
+        // The parquet file with `file_schema` just has `bigint_col` and 
`float_col` column, and don't have the `int_col`
+        let expr = col("bigint_col").eq(cast(col("int_col"), DataType::Int64));
+        let expected_candidate_expr =
+            col("bigint_col").eq(cast(lit(ScalarValue::Int32(None)), 
DataType::Int64));
 
         let candidate = FilterCandidateBuilder::new(expr, &file_schema, 
&table_schema)
             .build(metadata)

Reply via email to