This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 391f301efd Sort filenames when reading parquet to ensure consistent
schema (#6629)
391f301efd is described below
commit 391f301efdd37cbacbf11bf9db2d335b21a53a57
Author: Thomas Cameron <[email protected]>
AuthorDate: Mon Dec 11 23:17:11 2023 +0900
Sort filenames when reading parquet to ensure consistent schema (#6629)
* update
* FIXed
* add parquet
* update
* update
* update
* try2
* update
* update
* Add comments
* cargo fmt
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/datasource/file_format/parquet.rs | 85 ++++++++++++++++++++--
1 file changed, 80 insertions(+), 5 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 09e54558f1..9db320fb9d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -164,6 +164,16 @@ fn clear_metadata(
})
}
+async fn fetch_schema_with_location(
+ store: &dyn ObjectStore,
+ file: &ObjectMeta,
+ metadata_size_hint: Option<usize>,
+) -> Result<(Path, Schema)> {
+ let loc_path = file.location.clone();
+ let schema = fetch_schema(store, file, metadata_size_hint).await?;
+ Ok((loc_path, schema))
+}
+
#[async_trait]
impl FileFormat for ParquetFormat {
fn as_any(&self) -> &dyn Any {
@@ -176,13 +186,32 @@ impl FileFormat for ParquetFormat {
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
- let schemas: Vec<_> = futures::stream::iter(objects)
- .map(|object| fetch_schema(store.as_ref(), object,
self.metadata_size_hint))
+ let mut schemas: Vec<_> = futures::stream::iter(objects)
+ .map(|object| {
+ fetch_schema_with_location(
+ store.as_ref(),
+ object,
+ self.metadata_size_hint,
+ )
+ })
.boxed() // Workaround
https://github.com/rust-lang/rust/issues/64552
.buffered(state.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await?;
+ // Schema inference adds fields based the order they are seen
+ // which depends on the order the files are processed. For some
+ // object stores (like local file systems) the order returned from list
+ // is not deterministic. Thus, to ensure deterministic schema inference
+ // sort the files first.
+ // https://github.com/apache/arrow-datafusion/pull/6629
+ schemas.sort_by(|(location1, _), (location2, _)|
location1.cmp(location2));
+
+ let schemas = schemas
+ .into_iter()
+ .map(|(_, schema)| schema)
+ .collect::<Vec<_>>();
+
let schema = if self.skip_metadata(state.config_options()) {
Schema::try_merge(clear_metadata(schemas))
} else {
@@ -1124,12 +1153,21 @@ pub(crate) mod test_util {
batches: Vec<RecordBatch>,
multi_page: bool,
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
+ // we need the tmp files to be sorted as some tests rely on the how
the returning files are ordered
+ // https://github.com/apache/arrow-datafusion/pull/6629
+ let tmp_files = {
+ let mut tmp_files: Vec<_> = (0..batches.len())
+ .map(|_| NamedTempFile::new().expect("creating temp file"))
+ .collect();
+ tmp_files.sort_by(|a, b| a.path().cmp(b.path()));
+ tmp_files
+ };
+
// Each batch writes to their own file
let files: Vec<_> = batches
.into_iter()
- .map(|batch| {
- let mut output = NamedTempFile::new().expect("creating temp
file");
-
+ .zip(tmp_files.into_iter())
+ .map(|(batch, mut output)| {
let builder = WriterProperties::builder();
let props = if multi_page {
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
@@ -1155,6 +1193,7 @@ pub(crate) mod test_util {
.collect();
let meta: Vec<_> =
files.iter().map(local_unpartitioned_file).collect();
+
Ok((meta, files))
}
@@ -1254,6 +1293,42 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn is_schema_stable() -> Result<()> {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ let batch1 =
+ RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b",
c1.clone())])
+ .unwrap();
+ let batch2 =
+ RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d",
c2.clone())])
+ .unwrap();
+
+ let store = Arc::new(LocalFileSystem::new()) as _;
+ let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
+
+ let session = SessionContext::new();
+ let ctx = session.state();
+ let format = ParquetFormat::default();
+ let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
+
+ let order: Vec<_> = ["a", "b", "c", "d"]
+ .into_iter()
+ .map(|i| i.to_string())
+ .collect();
+ let coll: Vec<_> = schema
+ .all_fields()
+ .into_iter()
+ .map(|i| i.name().to_string())
+ .collect();
+ assert_eq!(coll, order);
+
+ Ok(())
+ }
+
#[derive(Debug)]
struct RequestCountingObjectStore {
inner: Arc<dyn ObjectStore>,