This is an automated email from the ASF dual-hosted git repository.
liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 06f01eaf6 change the null type in the row filter (#3470)
06f01eaf6 is described below
commit 06f01eaf65b78d73ffc7fb0bf95619e1e884e850
Author: Kun Liu <[email protected]>
AuthorDate: Thu Sep 15 08:56:22 2022 +0800
change the null type in the row filter (#3470)
---
.../src/physical_plan/file_format/row_filter.rs | 26 +++++++++++++++++-----
1 file changed, 20 insertions(+), 6 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs
b/datafusion/core/src/physical_plan/file_format/row_filter.rs
index 56bdba557..473856b7b 100644
--- a/datafusion/core/src/physical_plan/file_format/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -19,7 +19,7 @@ use arrow::array::{Array, BooleanArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
-use datafusion_common::{Column, Result, ScalarValue, ToDFSchema};
+use datafusion_common::{Column, DataFusionError, Result, ScalarValue,
ToDFSchema};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter,
RewriteRecursion};
use datafusion_expr::{uncombine_filter, Expr};
@@ -202,7 +202,18 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
if let Expr::Column(Column { name, .. }) = &expr {
if self.file_schema.field_with_name(name).is_err() {
- return Ok(Expr::Literal(ScalarValue::Null));
+ // the column expr must be in the table schema
+ return match self.table_schema.field_with_name(name) {
+ Ok(field) => {
+ // return the null value corresponding to the data type
+ let null_value =
ScalarValue::try_from(field.data_type())?;
+ Ok(Expr::Literal(null_value))
+ }
+ Err(e) => {
+ // If the column is not in the table schema, should
throw the error
+ Err(DataFusionError::ArrowError(e))
+ }
+ };
}
}
@@ -314,12 +325,13 @@ mod test {
use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::ScalarValue;
- use datafusion_expr::{col, lit};
+ use datafusion_expr::{cast, col, lit};
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
// Assume a column expression for a column not in the table schema is a
projected column and ignore it
#[test]
+ #[should_panic(expected = "building candidate failed")]
fn test_filter_candidate_builder_ignore_projected_columns() {
let testdata = crate::test_util::parquet_test_data();
let file = std::fs::File::open(&format!("{}/alltypes_plain.parquet",
testdata))
@@ -337,7 +349,7 @@ mod test {
let candidate = FilterCandidateBuilder::new(expr, &table_schema,
&table_schema)
.build(metadata)
- .expect("building candidate");
+ .expect("building candidate failed");
assert!(candidate.is_none());
}
@@ -386,8 +398,10 @@ mod test {
Field::new("float_col", DataType::Float32, true),
]);
- let expr = col("bigint_col").eq(col("int_col"));
- let expected_candidate_expr =
col("bigint_col").eq(lit(ScalarValue::Null));
+ // The parquet file with `file_schema` just has `bigint_col` and
`float_col` column, and don't have the `int_col`
+ let expr = col("bigint_col").eq(cast(col("int_col"), DataType::Int64));
+ let expected_candidate_expr =
+ col("bigint_col").eq(cast(lit(ScalarValue::Int32(None)),
DataType::Int64));
let candidate = FilterCandidateBuilder::new(expr, &file_schema,
&table_schema)
.build(metadata)