alamb commented on code in PR #15263: URL: https://github.com/apache/datafusion/pull/15263#discussion_r2001263333
########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -224,6 +224,327 @@ mod tests { ) } + #[tokio::test] + async fn test_pushdown_with_missing_column_in_file() { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + + // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // the default behavior is to fill in missing columns with nulls. + // Thus this predicate will come back as false. + let filter = col("c2").eq(lit(1_i32)); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let total_rows = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::<usize>(); + assert_eq!(total_rows, 0, "Expected no rows to match the predicate"); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 3, "Expected all rows to be pruned"); + + // If we excplicitly allow nulls the rest of the predicate should work + let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let batches = rt.batches.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 1 | |", + "+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 2, "Expected all rows to be pruned"); + } + + #[tokio::test] + async fn test_pushdown_with_missing_column_in_file_multiple_types() { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + + let file_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap(); + + // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // the default behavior is to fill in missing columns with nulls. + // Thus this predicate will come back as false. + let filter = col("c2").eq(lit("abc")); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let total_rows = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::<usize>(); + assert_eq!(total_rows, 0, "Expected no rows to match the predicate"); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 3, "Expected all rows to be pruned"); + + // If we excplicitly allow nulls the rest of the predicate should work + let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let batches = rt.batches.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 1 | |", + "+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 2, "Expected all rows to be pruned"); + } + + #[tokio::test] + async fn test_pushdown_with_missing_middle_column() { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let c3 = Arc::new(Int32Array::from(vec![7, 8, 9])); + + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), + ])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Utf8, true), + Field::new("c3", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap(); + + // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`, + // the default behavior is to fill in missing columns with nulls. + // Thus this predicate will come back as false. + let filter = col("c2").eq(lit("abc")); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let total_rows = rt + .batches + .unwrap() + .iter() + .map(|b| b.num_rows()) + .sum::<usize>(); + assert_eq!(total_rows, 0, "Expected no rows to match the predicate"); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 3, "Expected all rows to be pruned"); + + // If we excplicitly allow nulls the rest of the predicate should work + let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32))); + let rt = RoundTrip::new() + .with_schema(table_schema.clone()) + .with_predicate(filter.clone()) + .with_pushdown_predicate() + .round_trip(vec![batch.clone()]) + .await; + let batches = rt.batches.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+----+", + "| c1 | c2 | c3 |", + "+----+----+----+", + "| 1 | | 7 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let metrics = rt.parquet_exec.metrics().unwrap(); + let metric = get_value(&metrics, "pushdown_rows_pruned"); + assert_eq!(metric, 2, "Expected all rows to be pruned"); + } + + #[tokio::test] + async fn test_pushdown_with_file_column_order_mismatch() { + let c3 = Arc::new(Int32Array::from(vec![7, 8, 9])); + + let file_schema = Arc::new(Schema::new(vec![ + Field::new("c3", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), Review Comment: was it intentional to repeat the "c3" column here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org