alamb commented on code in PR #16133: URL: https://github.com/apache/datafusion/pull/16133#discussion_r2103082309
########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -909,30 +954,34 @@ mod tests { #[tokio::test] async fn evolved_schema_column_type_filter_ints() { // The table and filter have a common data type, but the file schema differs - let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); - let batch = create_batch(vec![("c1", c1.clone())]); + let table_schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::Int8, false)])); - let schema = + let file_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); + let file_c1: ArrayRef = Arc::new(UInt64Array::from(vec![Some(1), Some(2)])); + let file_batch = create_batch(vec![("c1", file_c1.clone())]); // Predicate should prune all row groups - let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); + let filter = col("c1").eq(lit(ScalarValue::Int8(Some(5)))); Review Comment: This change makes sense to me ########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -200,26 +210,43 @@ mod tests { /// run the test, returning the `RoundTripResult` async fn round_trip(&self, batches: Vec<RecordBatch>) -> RoundTripResult { - let file_schema = match &self.schema { + self.round_trip_with_file_batches(batches, None).await + } + + /// run the test, returning the `RoundTripResult` + /// If your table schema is different from file schema, you may need to specify the `file_batches` with the file schema + /// Or the file schema in the parquet source will be table schema, see `store_parquet` for detail + async fn round_trip_with_file_batches( + &self, + batches: Vec<RecordBatch>, + file_batches: Option<Vec<RecordBatch>>, + ) -> RoundTripResult { + let batches_schema = + Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())); + let file_schema = match &self.physical_file_schema { Some(schema) => schema, - None => &Arc::new( - Schema::try_merge( - batches.iter().map(|b| b.schema().as_ref().clone()), - ) - .unwrap(), - ), + None => &Arc::new(batches_schema.as_ref().unwrap().clone()), }; let file_schema = Arc::clone(file_schema); + let table_schema = match &self.logical_file_schema { + Some(schema) => schema, + None => &Arc::new(batches_schema.as_ref().unwrap().clone()), + }; + // If testing with page_index_predicate, write parquet // files with multiple pages let multi_page = self.page_index_predicate; - let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); Review Comment: I am a little confused about this -- it makes more sense to me that the batches that are written to the file define the file schema. When the file is read back, that schema may be different (because the table may have a different schema) I think the key difference is testing when the table schema is different than the file schema -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org