This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 391f301efd Sort filenames when reading parquet to ensure consistent 
schema (#6629)
391f301efd is described below

commit 391f301efdd37cbacbf11bf9db2d335b21a53a57
Author: Thomas Cameron <[email protected]>
AuthorDate: Mon Dec 11 23:17:11 2023 +0900

    Sort filenames when reading parquet to ensure consistent schema (#6629)
    
    * update
    
    * FIXed
    
    * add  parquet
    
    * update
    
    * update
    
    * update
    
    * try2
    
    * update
    
    * update
    
    * Add comments
    
    * cargo fmt
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../core/src/datasource/file_format/parquet.rs     | 85 ++++++++++++++++++++--
 1 file changed, 80 insertions(+), 5 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 09e54558f1..9db320fb9d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -164,6 +164,16 @@ fn clear_metadata(
     })
 }
 
+async fn fetch_schema_with_location(
+    store: &dyn ObjectStore,
+    file: &ObjectMeta,
+    metadata_size_hint: Option<usize>,
+) -> Result<(Path, Schema)> {
+    let loc_path = file.location.clone();
+    let schema = fetch_schema(store, file, metadata_size_hint).await?;
+    Ok((loc_path, schema))
+}
+
 #[async_trait]
 impl FileFormat for ParquetFormat {
     fn as_any(&self) -> &dyn Any {
@@ -176,13 +186,32 @@ impl FileFormat for ParquetFormat {
         store: &Arc<dyn ObjectStore>,
         objects: &[ObjectMeta],
     ) -> Result<SchemaRef> {
-        let schemas: Vec<_> = futures::stream::iter(objects)
-            .map(|object| fetch_schema(store.as_ref(), object, 
self.metadata_size_hint))
+        let mut schemas: Vec<_> = futures::stream::iter(objects)
+            .map(|object| {
+                fetch_schema_with_location(
+                    store.as_ref(),
+                    object,
+                    self.metadata_size_hint,
+                )
+            })
             .boxed() // Workaround 
https://github.com/rust-lang/rust/issues/64552
             .buffered(state.config_options().execution.meta_fetch_concurrency)
             .try_collect()
             .await?;
 
+        // Schema inference adds fields based the order they are seen
+        // which depends on the order the files are processed. For some
+        // object stores (like local file systems) the order returned from list
+        // is not deterministic. Thus, to ensure deterministic schema inference
+        // sort the files first.
+        // https://github.com/apache/arrow-datafusion/pull/6629
+        schemas.sort_by(|(location1, _), (location2, _)| 
location1.cmp(location2));
+
+        let schemas = schemas
+            .into_iter()
+            .map(|(_, schema)| schema)
+            .collect::<Vec<_>>();
+
         let schema = if self.skip_metadata(state.config_options()) {
             Schema::try_merge(clear_metadata(schemas))
         } else {
@@ -1124,12 +1153,21 @@ pub(crate) mod test_util {
         batches: Vec<RecordBatch>,
         multi_page: bool,
     ) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
+        // we need the tmp files to be sorted as some tests rely on the how 
the returning files are ordered
+        // https://github.com/apache/arrow-datafusion/pull/6629
+        let tmp_files = {
+            let mut tmp_files: Vec<_> = (0..batches.len())
+                .map(|_| NamedTempFile::new().expect("creating temp file"))
+                .collect();
+            tmp_files.sort_by(|a, b| a.path().cmp(b.path()));
+            tmp_files
+        };
+
         // Each batch writes to their own file
         let files: Vec<_> = batches
             .into_iter()
-            .map(|batch| {
-                let mut output = NamedTempFile::new().expect("creating temp 
file");
-
+            .zip(tmp_files.into_iter())
+            .map(|(batch, mut output)| {
                 let builder = WriterProperties::builder();
                 let props = if multi_page {
                     builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
@@ -1155,6 +1193,7 @@ pub(crate) mod test_util {
             .collect();
 
         let meta: Vec<_> = 
files.iter().map(local_unpartitioned_file).collect();
+
         Ok((meta, files))
     }
 
@@ -1254,6 +1293,42 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn is_schema_stable() -> Result<()> {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let batch1 =
+            RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", 
c1.clone())])
+                .unwrap();
+        let batch2 =
+            RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", 
c2.clone())])
+                .unwrap();
+
+        let store = Arc::new(LocalFileSystem::new()) as _;
+        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
+
+        let session = SessionContext::new();
+        let ctx = session.state();
+        let format = ParquetFormat::default();
+        let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
+
+        let order: Vec<_> = ["a", "b", "c", "d"]
+            .into_iter()
+            .map(|i| i.to_string())
+            .collect();
+        let coll: Vec<_> = schema
+            .all_fields()
+            .into_iter()
+            .map(|i| i.name().to_string())
+            .collect();
+        assert_eq!(coll, order);
+
+        Ok(())
+    }
+
     #[derive(Debug)]
     struct RequestCountingObjectStore {
         inner: Arc<dyn ObjectStore>,

Reply via email to