zhuqi-lucas opened a new pull request, #13788: URL: https://github.com/apache/datafusion/pull/13788
## Which issue does this PR close? Closes [#.](https://github.com/apache/datafusion/issues/13745) ## Rationale for this change We always limit the partition file even with filter before limit, so it will return the wrong file group numbers for us to filter the data before the final limit. Fix way, we should not limit the partition files when we have filter already with limit. ## What changes are included in this PR? ## Are these changes tested? Testing code with generating files. ``` #[tokio::main] pub async fn main() -> Result<()> { let output_dir = "/tmp/test_parquet_data"; let num_files = 120; // Number of Parquet files let rows_per_file = 200; // Number of rows per Parquet file // Generate the dataset generate_test_data(output_dir, num_files, rows_per_file); println!("Generated {} Parquet files with {} rows each", num_files, rows_per_file); let file_path = "/tmp/test_parquet_data/part-119.parquet"; // 最后一个文件 let file = File::open(file_path).unwrap(); let parquet_reader = SerializedFileReader::new(file).unwrap(); let mut iter = parquet_reader.get_row_iter(None).unwrap(); while let Some(record) = iter.next() { println!("{:?}", record); } let mut parquet_options = ParquetReadOptions::new(); parquet_options = parquet_options.parquet_pruning(true); let mut config = SessionConfig::new(); config.options_mut().execution.parquet.pushdown_filters = true; let state = SessionStateBuilder::new().with_config(config).build(); let ctx = SessionContext::from(state); let mut df = ctx .read_parquet(output_dir, parquet_options.clone()) .await .unwrap(); df = df .filter(col("a").eq(lit( "23asdas23", ))) .unwrap(); df = df.limit(0, Some(1)).unwrap(); let batch = df.collect().await.unwrap(); println!("{:?}", batch); Ok(()) } fn generate_test_data(output_dir: &str, num_files: usize, rows_per_file: usize) { // Define the schema let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Int32, false), ])); for file_index in 0..num_files { // Generate data for this file let mut a_values = Vec::new(); let mut b_values = Vec::new(); for row_index in 0..rows_per_file { // Fill in rows if file_index == num_files - 1 && row_index == rows_per_file / 2 { // Add the target row deep in the dataset a_values.push("23asdas23".to_string()); b_values.push(999); } else { a_values.push(format!("random_{}_{}", file_index, row_index)); b_values.push((file_index * rows_per_file + row_index) as i32); } } // Create Arrow arrays let a_array = Arc::new(StringArray::from(a_values)) as Arc<dyn arrow::array::Array>; let b_array = Arc::new(Int32Array::from(b_values)) as Arc<dyn arrow::array::Array>; // Create a record batch let batch = RecordBatch::try_new(schema.clone(), vec![a_array, b_array]).unwrap(); // Write to a Parquet file let file_path = format!("{}/part-{}.parquet", output_dir, file_index); let file = File::create(file_path).unwrap(); let props = WriterProperties::builder().build(); let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); } } ``` After this PR, the testing will print the right value instead of the empty value. println!("{:?}", batch); ## Are there any user-facing changes? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
