adriangb commented on code in PR #16086: URL: https://github.com/apache/datafusion/pull/16086#discussion_r2094817127
########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -848,6 +848,68 @@ mod tests { assert_eq!(read.len(), 0); } + #[tokio::test] + async fn evolved_schema_column_type_filter_strings() { + // The table and filter have a common data type, but the file schema differs + let c1: ArrayRef = + Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")])); + let batch = create_batch(vec![("c1", c1.clone())]); + + let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); + + // Predicate should prune all row groups + let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema.clone()) + .round_trip_to_batches(vec![batch.clone()]) + .await + .unwrap(); + assert_eq!(read.len(), 0); + + // Predicate should prune no row groups + let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string())))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema) + .round_trip_to_batches(vec![batch]) + .await + .unwrap(); + assert_eq!(read.len(), 1); + } + + #[tokio::test] + async fn evolved_schema_column_type_filter_ints() { + // The table and filter have a common data type, but the file schema differs + let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); + let batch = create_batch(vec![("c1", c1.clone())]); + + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); + + // Predicate should prune all row groups + let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema.clone()) + .round_trip_to_batches(vec![batch.clone()]) + .await + .unwrap(); + assert_eq!(read.len(), 0); + + // TODO: this is failing on main, and has been for a long time! + // See <comment on PR> + // // Predicate should prune no row groups + // let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + // let read = RoundTrip::new() + // .with_predicate(filter) + // .with_schema(schema) + // .round_trip_to_batches(vec![batch]) + // .await + // .unwrap(); + // assert_eq!(read.len(), 1); Review Comment: This has been failing at least as far back as v46.0.0 with this diff: ```diff diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 3b71593b3..60b403aff 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -67,13 +67,14 @@ pub(crate) mod test_util { .into_iter() .zip(tmp_files.into_iter()) .map(|(batch, mut output)| { - let builder = parquet::file::properties::WriterProperties::builder(); - let props = if multi_page { + let mut builder = parquet::file::properties::WriterProperties::builder(); + builder = if multi_page { builder.set_data_page_row_count_limit(ROWS_PER_PAGE) } else { builder - } - .build(); + }; + builder = builder.set_bloom_filter_enabled(true); + let props = builder.build(); let mut writer = parquet::arrow::ArrowWriter::try_new( &mut output, diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 888f3ad9e..240d84783 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -489,6 +489,37 @@ mod tests { assert_eq!(read.len(), 0); } + + #[tokio::test] + async fn evolved_schema_column_type_filter_ints() { + // The table and filter have a common data type, but the file schema differs + let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); + let batch = create_batch(vec![("c1", c1.clone())]); + + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); + + // // Predicate should prune all row groups + // let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); + // let read = RoundTrip::new() + // .with_predicate(filter) + // .with_schema(schema.clone()) + // .round_trip_to_batches(vec![batch.clone()]) + // .await + // .unwrap(); + // assert_eq!(read.len(), 0); + + // Predicate should prune no row groups + let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema) + .round_trip_to_batches(vec![batch]) + .await + .unwrap(); + assert_eq!(read.len(), 1); + } + #[tokio::test] async fn evolved_schema_disjoint_schema_filter() { let c1: ArrayRef = ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org